http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ValueResolver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ValueResolver.java new file mode 100644 index 0000000..7fc112b --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ValueResolver.java @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.management.ExecutionContext; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.TaskAdaptable; +import org.apache.brooklyn.core.util.flags.TypeCoercions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.entity.basic.EntityInternal; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.guava.Maybe; +import brooklyn.util.javalang.JavaClassNames; +import brooklyn.util.repeat.Repeater; +import brooklyn.util.time.CountdownTimer; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Durations; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.reflect.TypeToken; + +/** + * Resolves a given object, as follows: + * <li> If it is a {@link Tasks} or a {@link DeferredSupplier} then get its contents + * <li> If it's a map and {@link #deep(boolean)} is requested, it applies resolution to contents + * <li> It applies coercion + * <p> + * Fluent-style API exposes a number of other options. + */ +public class ValueResolver<T> implements DeferredSupplier<T> { + + /** + * Period to wait if we're expected to return real quick + * but we want fast things to have time to finish. + * <p> + * Timings are always somewhat arbitrary but this at least + * allows some intention to be captured in code rather than arbitrary values. */ + public static Duration REAL_QUICK_WAIT = Duration.millis(50); + /** + * Period to wait if we're expected to return quickly + * but we want to be a bit more generous for things to finish, + * without letting a caller get annoyed. + * <p> + * See {@link #REAL_QUICK_WAIT}. */ + public static Duration PRETTY_QUICK_WAIT = Duration.millis(200); + + /** Period to wait when we have to poll but want to give the illusion of no wait. + * See {@link Repeater#DEFAULT_REAL_QUICK_PERIOD} */ + public static Duration REAL_QUICK_PERIOD = Repeater.DEFAULT_REAL_QUICK_PERIOD; + + private static final Logger log = LoggerFactory.getLogger(ValueResolver.class); + + final Object value; + final Class<T> type; + ExecutionContext exec; + String description; + boolean forceDeep; + /** null means do it if you can; true means always, false means never */ + Boolean embedResolutionInTask; + /** timeout on execution, if possible, or if embedResolutionInTask is true */ + Duration timeout; + boolean isTransientTask = true; + + T defaultValue = null; + boolean returnDefaultOnGet = false; + boolean swallowExceptions = false; + + // internal fields + final Object parentOriginalValue; + final CountdownTimer parentTimer; + AtomicBoolean started = new AtomicBoolean(false); + boolean expired; + + ValueResolver(Object v, Class<T> type) { + this.value = v; + this.type = type; + checkTypeNotNull(); + parentOriginalValue = null; + parentTimer = null; + } + + ValueResolver(Object v, Class<T> type, ValueResolver<?> parent) { + this.value = v; + this.type = type; + checkTypeNotNull(); + + exec = parent.exec; + description = parent.description; + forceDeep = parent.forceDeep; + embedResolutionInTask = parent.embedResolutionInTask; + + parentOriginalValue = parent.getOriginalValue(); + + timeout = parent.timeout; + parentTimer = parent.parentTimer; + if (parentTimer!=null && parentTimer.isExpired()) + expired = true; + + // default value and swallow exceptions do not need to be nested + } + + public static class ResolverBuilderPretype { + final Object v; + public ResolverBuilderPretype(Object v) { + this.v = v; + } + public <T> ValueResolver<T> as(Class<T> type) { + return new ValueResolver<T>(v, type); + } + } + + /** returns a copy of this resolver which can be queried, even if the original (single-use instance) has already been copied */ + public ValueResolver<T> clone() { + ValueResolver<T> result = new ValueResolver<T>(value, type) + .context(exec).description(description) + .embedResolutionInTask(embedResolutionInTask) + .deep(forceDeep) + .timeout(timeout); + if (returnDefaultOnGet) result.defaultValue(defaultValue); + if (swallowExceptions) result.swallowExceptions(); + return result; + } + + /** execution context to use when resolving; required if resolving unsubmitted tasks or running with a time limit */ + public ValueResolver<T> context(ExecutionContext exec) { + this.exec = exec; + return this; + } + /** as {@link #context(ExecutionContext)} for use from an entity */ + public ValueResolver<T> context(Entity entity) { + return context(entity!=null ? ((EntityInternal)entity).getExecutionContext() : null); + } + + /** sets a message which will be displayed in status reports while it waits (e.g. the name of the config key being looked up) */ + public ValueResolver<T> description(String description) { + this.description = description; + return this; + } + + /** sets a default value which will be returned on a call to {@link #get()} if the task does not complete + * or completes with an error + * <p> + * note that {@link #getMaybe()} returns an absent object even in the presence of + * a default, so that any error can still be accessed */ + public ValueResolver<T> defaultValue(T defaultValue) { + this.defaultValue = defaultValue; + this.returnDefaultOnGet = true; + return this; + } + + /** indicates that no default value should be returned on a call to {@link #get()}, and instead it should throw + * (this is the default; this method is provided to undo a call to {@link #defaultValue(Object)}) */ + public ValueResolver<T> noDefaultValue() { + this.returnDefaultOnGet = false; + this.defaultValue = null; + return this; + } + + /** indicates that exceptions in resolution should not be thrown on a call to {@link #getMaybe()}, + * but rather used as part of the {@link Maybe#get()} if it's absent, + * and swallowed altogether on a call to {@link #get()} in the presence of a {@link #defaultValue(Object)} */ + public ValueResolver<T> swallowExceptions() { + this.swallowExceptions = true; + return this; + } + + /** whether the task should be marked as transient; defaults true */ + public ValueResolver<T> transientTask(boolean isTransientTask) { + this.isTransientTask = isTransientTask; + return this; + } + + public Maybe<T> getDefault() { + if (returnDefaultOnGet) return Maybe.of(defaultValue); + else return Maybe.absent("No default value set"); + } + + /** causes nested structures (maps, lists) to be descended and nested unresolved values resolved */ + public ValueResolver<T> deep(boolean forceDeep) { + this.forceDeep = forceDeep; + return this; + } + + /** if true, forces execution of a deferred supplier to be run in a task; + * if false, it prevents it (meaning time limits may not be applied); + * if null, the default, it runs in a task if a time limit is applied. + * <p> + * running inside a task is required for some {@link DeferredSupplier} + * instances which look up a task {@link ExecutionContext}. */ + public ValueResolver<T> embedResolutionInTask(Boolean embedResolutionInTask) { + this.embedResolutionInTask = embedResolutionInTask; + return this; + } + + /** sets a time limit on executions + * <p> + * used for {@link Task} and {@link DeferredSupplier} instances. + * may require an execution context at runtime. */ + public ValueResolver<T> timeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + protected void checkTypeNotNull() { + if (type==null) + throw new NullPointerException("type must be set to resolve, for '"+value+"'"+(description!=null ? ", "+description : "")); + } + + public T get() { + Maybe<T> m = getMaybe(); + if (m.isPresent()) return m.get(); + if (returnDefaultOnGet) return defaultValue; + return m.get(); + } + + public Maybe<T> getMaybe() { + Maybe<T> result = getMaybeInternal(); + if (log.isTraceEnabled()) { + log.trace(this+" evaluated as "+result); + } + return result; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + protected Maybe<T> getMaybeInternal() { + if (started.getAndSet(true)) + throw new IllegalStateException("ValueResolver can only be used once"); + + if (expired) return Maybe.absent("Nested resolution of "+getOriginalValue()+" did not complete within "+timeout); + + ExecutionContext exec = this.exec; + if (exec==null) { + // if execution context not specified, take it from the current task if present + exec = BasicExecutionContext.getCurrentExecutionContext(); + } + + CountdownTimer timerU = parentTimer; + if (timerU==null && timeout!=null) + timerU = timeout.countdownTimer(); + final CountdownTimer timer = timerU; + if (timer!=null && !timer.isRunning()) + timer.start(); + + checkTypeNotNull(); + Object v = this.value; + + //if the expected type is a closure or map and that's what we have, we're done (or if it's null); + //but not allowed to return a future or DeferredSupplier as the resolved value + if (v==null || (!forceDeep && type.isInstance(v) && !Future.class.isInstance(v) && !DeferredSupplier.class.isInstance(v))) + return Maybe.of((T) v); + + try { + //if it's a task or a future, we wait for the task to complete + if (v instanceof TaskAdaptable<?>) { + //if it's a task, we make sure it is submitted + if (!((TaskAdaptable<?>) v).asTask().isSubmitted() ) { + if (exec==null) + return Maybe.absent("Value for unsubmitted task '"+getDescription()+"' requested but no execution context available"); + exec.submit(((TaskAdaptable<?>) v).asTask()); + } + } + + if (v instanceof Future) { + final Future<?> vfuture = (Future<?>) v; + + //including tasks, above + if (!vfuture.isDone()) { + Callable<Maybe> callable = new Callable<Maybe>() { + public Maybe call() throws Exception { + return Durations.get(vfuture, timer); + } }; + + String description = getDescription(); + Maybe vm = Tasks.withBlockingDetails("Waiting for "+description, callable); + if (vm.isAbsent()) return vm; + v = vm.get(); + + } else { + v = vfuture.get(); + + } + + } else if (v instanceof DeferredSupplier<?>) { + final Object vf = v; + + if ((!Boolean.FALSE.equals(embedResolutionInTask) && (exec!=null || timeout!=null)) || Boolean.TRUE.equals(embedResolutionInTask)) { + if (exec==null) + return Maybe.absent("Embedding in task needed for '"+getDescription()+"' but no execution context available"); + + Callable<Object> callable = new Callable<Object>() { + public Object call() throws Exception { + try { + Tasks.setBlockingDetails("Retrieving "+vf); + return ((DeferredSupplier<?>) vf).get(); + } finally { + Tasks.resetBlockingDetails(); + } + } }; + String description = getDescription(); + TaskBuilder<Object> vb = Tasks.<Object>builder().body(callable).name("Resolving dependent value").description(description); + if (isTransientTask) vb.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG); + Task<Object> vt = exec.submit(vb.build()); + // TODO to handle immediate resolution, it would be nice to be able to submit + // so it executes in the current thread, + // or put a marker in the target thread or task while it is running that the task + // should never wait on anything other than another value being resolved + // (though either could recurse infinitely) + Maybe<Object> vm = Durations.get(vt, timer); + vt.cancel(true); + if (vm.isAbsent()) return (Maybe<T>)vm; + v = vm.get(); + + } else { + try { + Tasks.setBlockingDetails("Retrieving (non-task) "+vf); + v = ((DeferredSupplier<?>) vf).get(); + } finally { + Tasks.resetBlockingDetails(); + } + } + + } else if (v instanceof Map) { + //and if a map or list we look inside + Map result = Maps.newLinkedHashMap(); + for (Map.Entry<?,?> entry : ((Map<?,?>)v).entrySet()) { + Maybe<?> kk = new ValueResolver(entry.getKey(), type, this) + .description( (description!=null ? description+", " : "") + "map key "+entry.getKey() ) + .getMaybe(); + if (kk.isAbsent()) return (Maybe<T>)kk; + Maybe<?> vv = new ValueResolver(entry.getValue(), type, this) + .description( (description!=null ? description+", " : "") + "map value for key "+kk.get() ) + .getMaybe(); + if (vv.isAbsent()) return (Maybe<T>)vv; + result.put(kk.get(), vv.get()); + } + return Maybe.of((T) result); + + } else if (v instanceof Set) { + Set result = Sets.newLinkedHashSet(); + int count = 0; + for (Object it : (Set)v) { + Maybe<?> vv = new ValueResolver(it, type, this) + .description( (description!=null ? description+", " : "") + "entry "+count ) + .getMaybe(); + if (vv.isAbsent()) return (Maybe<T>)vv; + result.add(vv.get()); + count++; + } + return Maybe.of((T) result); + + } else if (v instanceof Iterable) { + List result = Lists.newArrayList(); + int count = 0; + for (Object it : (Iterable)v) { + Maybe<?> vv = new ValueResolver(it, type, this) + .description( (description!=null ? description+", " : "") + "entry "+count ) + .getMaybe(); + if (vv.isAbsent()) return (Maybe<T>)vv; + result.add(vv.get()); + count++; + } + return Maybe.of((T) result); + + } else { + return TypeCoercions.tryCoerce(v, TypeToken.of(type)); + } + + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + + IllegalArgumentException problem = new IllegalArgumentException("Error resolving "+(description!=null ? description+", " : "")+v+", in "+exec+": "+e, e); + if (swallowExceptions) { + if (log.isDebugEnabled()) + log.debug("Resolution of "+this+" failed, swallowing and returning: "+e); + return Maybe.absent(problem); + } + if (log.isDebugEnabled()) + log.debug("Resolution of "+this+" failed, throwing: "+e); + throw problem; + } + + return new ValueResolver(v, type, this).getMaybe(); + } + + protected String getDescription() { + return description!=null ? description : ""+value; + } + protected Object getOriginalValue() { + if (parentOriginalValue!=null) return parentOriginalValue; + return value; + } + + @Override + public String toString() { + return JavaClassNames.cleanSimpleClassName(this)+"["+JavaClassNames.cleanSimpleClassName(type)+" "+value+"]"; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskFactory.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskFactory.java new file mode 100644 index 0000000..a38e305 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskFactory.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task.ssh; + +import org.apache.brooklyn.api.management.TaskFactory; +import org.apache.brooklyn.core.util.config.ConfigBag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.brooklyn.location.basic.SshMachineLocation; + +// cannot be (cleanly) instantiated due to nested generic self-referential type; however trivial subclasses do allow it +public class SshFetchTaskFactory implements TaskFactory<SshFetchTaskWrapper> { + + private static final Logger log = LoggerFactory.getLogger(SshFetchTaskFactory.class); + + private boolean dirty = false; + + protected SshMachineLocation machine; + protected String remoteFile; + protected final ConfigBag config = ConfigBag.newInstance(); + + /** constructor where machine will be added later */ + public SshFetchTaskFactory(String remoteFile) { + remoteFile(remoteFile); + } + + /** convenience constructor to supply machine immediately */ + public SshFetchTaskFactory(SshMachineLocation machine, String remoteFile) { + machine(machine); + remoteFile(remoteFile); + } + + protected SshFetchTaskFactory self() { return this; } + + protected void markDirty() { + dirty = true; + } + + public SshFetchTaskFactory machine(SshMachineLocation machine) { + markDirty(); + this.machine = machine; + return self(); + } + + public SshMachineLocation getMachine() { + return machine; + } + + public SshFetchTaskFactory remoteFile(String remoteFile) { + this.remoteFile = remoteFile; + return self(); + } + + public ConfigBag getConfig() { + return config; + } + + @Override + public SshFetchTaskWrapper newTask() { + dirty = false; + return new SshFetchTaskWrapper(this); + } + + @Override + protected void finalize() throws Throwable { + // help let people know of API usage error + if (dirty) + log.warn("Task "+this+" was modified but modification was never used"); + super.finalize(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskWrapper.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskWrapper.java new file mode 100644 index 0000000..b6c2931 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshFetchTaskWrapper.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task.ssh; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.TaskWrapper; +import org.apache.brooklyn.core.util.config.ConfigBag; +import org.apache.brooklyn.core.util.task.TaskBuilder; +import org.apache.brooklyn.core.util.task.Tasks; +import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.FilenameUtils; +import org.apache.brooklyn.location.basic.SshMachineLocation; + +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.os.Os; + +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; + +/** + * As {@link ProcessTaskWrapper}, but putting a file on the remote machine + * + * @since 0.6.0 + */ +@Beta +public class SshFetchTaskWrapper implements TaskWrapper<String> { + + private final Task<String> task; + + private final String remoteFile; + private final SshMachineLocation machine; + private File backingFile; + private final ConfigBag config; + + + // package private as only AbstractSshTaskFactory should invoke + SshFetchTaskWrapper(SshFetchTaskFactory factory) { + this.remoteFile = Preconditions.checkNotNull(factory.remoteFile, "remoteFile"); + this.machine = Preconditions.checkNotNull(factory.machine, "machine"); + TaskBuilder<String> tb = TaskBuilder.<String>builder().dynamic(false).name("ssh fetch "+factory.remoteFile); + task = tb.body(new SshFetchJob()).build(); + config = factory.getConfig(); + } + + @Override + public Task<String> asTask() { + return getTask(); + } + + @Override + public Task<String> getTask() { + return task; + } + + public String getRemoteFile() { + return remoteFile; + } + + public SshMachineLocation getMachine() { + return machine; + } + + private class SshFetchJob implements Callable<String> { + @Override + public String call() throws Exception { + int result = -1; + try { + Preconditions.checkNotNull(getMachine(), "machine"); + backingFile = Os.newTempFile("brooklyn-ssh-fetch-", FilenameUtils.getName(remoteFile)); + backingFile.deleteOnExit(); + + result = getMachine().copyFrom(config.getAllConfig(), remoteFile, backingFile.getPath()); + } catch (Exception e) { + throw new IllegalStateException("SSH fetch "+getRemoteFile()+" from "+getMachine()+" returned threw exception, in "+Tasks.current()+": "+e, e); + } + if (result!=0) { + throw new IllegalStateException("SSH fetch "+getRemoteFile()+" from "+getMachine()+" returned non-zero exit code "+result+", in "+Tasks.current()); + } + return FileUtils.readFileToString(backingFile); + } + } + + @Override + public String toString() { + return super.toString()+"["+task+"]"; + } + + /** blocks, returns the fetched file as a string, throwing if there was an exception */ + public String get() { + return getTask().getUnchecked(); + } + + /** blocks, returns the fetched file as bytes, throwing if there was an exception */ + public byte[] getBytes() { + block(); + try { + return FileUtils.readFileToByteArray(backingFile); + } catch (IOException e) { + throw Exceptions.propagate(e); + } + } + + /** blocks until the task completes; does not throw */ + public SshFetchTaskWrapper block() { + getTask().blockUntilEnded(); + return this; + } + + /** true iff the ssh job has completed (with or without failure) */ + public boolean isDone() { + return getTask().isDone(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskFactory.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskFactory.java new file mode 100644 index 0000000..05cc42e --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskFactory.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task.ssh; + +import java.io.InputStream; +import java.io.Reader; + +import org.apache.brooklyn.api.management.TaskFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.stream.KnownSizeInputStream; +import brooklyn.util.stream.ReaderInputStream; + +import com.google.common.base.Suppliers; + +// cannot be (cleanly) instantiated due to nested generic self-referential type; however trivial subclasses do allow it +public class SshPutTaskFactory extends SshPutTaskStub implements TaskFactory<SshPutTaskWrapper> { + + private static final Logger log = LoggerFactory.getLogger(SshPutTaskFactory.class); + + private boolean dirty = false; + + /** constructor where machine will be added later */ + public SshPutTaskFactory(String remoteFile) { + remoteFile(remoteFile); + } + + /** convenience constructor to supply machine immediately */ + public SshPutTaskFactory(SshMachineLocation machine, String remoteFile) { + machine(machine); + remoteFile(remoteFile); + } + + protected SshPutTaskFactory self() { return this; } + + protected void markDirty() { + dirty = true; + } + + public SshPutTaskFactory machine(SshMachineLocation machine) { + markDirty(); + this.machine = machine; + return self(); + } + + public SshPutTaskFactory remoteFile(String remoteFile) { + this.remoteFile = remoteFile; + return self(); + } + + public SshPutTaskFactory summary(String summary) { + markDirty(); + this.summary = summary; + return self(); + } + + public SshPutTaskFactory contents(String contents) { + markDirty(); + this.contents = Suppliers.ofInstance(KnownSizeInputStream.of(contents)); + return self(); + } + + public SshPutTaskFactory contents(byte[] contents) { + markDirty(); + this.contents = Suppliers.ofInstance(KnownSizeInputStream.of(contents)); + return self(); + } + + public SshPutTaskFactory contents(InputStream stream) { + markDirty(); + this.contents = Suppliers.ofInstance(stream); + return self(); + } + + public SshPutTaskFactory contents(Reader reader) { + markDirty(); + this.contents = Suppliers.ofInstance(new ReaderInputStream(reader)); + return self(); + } + + public SshPutTaskFactory allowFailure() { + markDirty(); + allowFailure = true; + return self(); + } + + public SshPutTaskFactory createDirectory() { + markDirty(); + createDirectory = true; + return self(); + } + + public SshPutTaskWrapper newTask() { + dirty = false; + return new SshPutTaskWrapper(this); + } + + @Override + protected void finalize() throws Throwable { + // help let people know of API usage error + if (dirty) + log.warn("Task "+this+" was modified but modification was never used"); + super.finalize(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskStub.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskStub.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskStub.java new file mode 100644 index 0000000..4e3a024 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskStub.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task.ssh; + +import java.io.InputStream; + +import org.apache.brooklyn.core.util.config.ConfigBag; +import org.apache.brooklyn.location.basic.SshMachineLocation; + +import com.google.common.base.Supplier; + +public class SshPutTaskStub { + + protected String remoteFile; + protected SshMachineLocation machine; + protected Supplier<? extends InputStream> contents; + protected String summary; + protected String permissions; + protected boolean allowFailure = false; + protected boolean createDirectory = false; + protected final ConfigBag config = ConfigBag.newInstance(); + + protected SshPutTaskStub() { + } + + protected SshPutTaskStub(SshPutTaskStub constructor) { + this.remoteFile = constructor.remoteFile; + this.machine = constructor.machine; + this.contents = constructor.contents; + this.summary = constructor.summary; + this.allowFailure = constructor.allowFailure; + this.createDirectory = constructor.createDirectory; + this.permissions = constructor.permissions; + this.config.copy(constructor.config); + } + + public String getRemoteFile() { + return remoteFile; + } + + public String getSummary() { + if (summary!=null) return summary; + return "scp put: "+remoteFile; + } + + public SshMachineLocation getMachine() { + return machine; + } + + protected ConfigBag getConfig() { + return config; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskWrapper.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskWrapper.java new file mode 100644 index 0000000..13449d0 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshPutTaskWrapper.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task.ssh; + +import java.util.Arrays; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.TaskWrapper; +import org.apache.brooklyn.core.util.config.ConfigBag; +import org.apache.brooklyn.core.util.internal.ssh.SshTool; +import org.apache.brooklyn.core.util.task.TaskBuilder; +import org.apache.brooklyn.core.util.task.Tasks; +import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; + +/** As {@link ProcessTaskWrapper}, but putting a file on the remote machine */ +@Beta +public class SshPutTaskWrapper extends SshPutTaskStub implements TaskWrapper<Void> { + + private static final Logger log = LoggerFactory.getLogger(SshPutTaskWrapper.class); + + private final Task<Void> task; + + protected Integer exitCodeOfCopy = null; + protected Exception exception = null; + protected boolean successful = false; + + // package private as only AbstractSshTaskFactory should invoke + SshPutTaskWrapper(SshPutTaskFactory constructor) { + super(constructor); + TaskBuilder<Void> tb = TaskBuilder.<Void>builder().dynamic(false).name(getSummary()); + task = tb.body(new SshPutJob()).build(); + } + + @Override + public Task<Void> asTask() { + return getTask(); + } + + @Override + public Task<Void> getTask() { + return task; + } + + // TODO: + // verify + // copyAsRoot + // owner + // lastModificationDate - see {@link #PROP_LAST_MODIFICATION_DATE}; not supported by all SshTool implementations + // lastAccessDate - see {@link #PROP_LAST_ACCESS_DATE}; not supported by all SshTool implementations + + private class SshPutJob implements Callable<Void> { + @Override + public Void call() throws Exception { + try { + Preconditions.checkNotNull(getMachine(), "machine"); + + String remoteFile = getRemoteFile(); + + if (createDirectory) { + String remoteDir = remoteFile; + int exitCodeOfCreate = -1; + try { + int li = remoteDir.lastIndexOf("/"); + if (li>=0) { + remoteDir = remoteDir.substring(0, li+1); + exitCodeOfCreate = getMachine().execCommands("creating directory for "+getSummary(), + Arrays.asList("mkdir -p "+remoteDir)); + } else { + // nothing to create + exitCodeOfCreate = 0; + } + } catch (Exception e) { + if (log.isDebugEnabled()) + log.debug("SSH put "+getRemoteFile()+" (create dir, in task "+getSummary()+") to "+getMachine()+" threw exception: "+e); + exception = e; + } + if (exception!=null || !((Integer)0).equals(exitCodeOfCreate)) { + if (!allowFailure) { + if (exception != null) { + throw new IllegalStateException(getSummary()+" (creating dir "+remoteDir+" for SSH put task) ended with exception, in "+Tasks.current()+": "+exception, exception); + } + if (exitCodeOfCreate!=0) { + exception = new IllegalStateException(getSummary()+" (creating dir "+remoteDir+" SSH put task) ended with exit code "+exitCodeOfCreate+", in "+Tasks.current()); + throw exception; + } + } + // not successful, but allowed + return null; + } + } + + ConfigBag config = ConfigBag.newInstanceCopying(getConfig()); + if (permissions!=null) config.put(SshTool.PROP_PERMISSIONS, permissions); + + exitCodeOfCopy = getMachine().copyTo(config.getAllConfig(), contents.get(), remoteFile); + + if (log.isDebugEnabled()) + log.debug("SSH put "+getRemoteFile()+" (task "+getSummary()+") to "+getMachine()+" completed with exit code "+exitCodeOfCopy); + } catch (Exception e) { + if (log.isDebugEnabled()) + log.debug("SSH put "+getRemoteFile()+" (task "+getSummary()+") to "+getMachine()+" threw exception: "+e); + exception = e; + } + + if (exception!=null || !((Integer)0).equals(exitCodeOfCopy)) { + if (!allowFailure) { + if (exception != null) { + throw new IllegalStateException(getSummary()+" (SSH put task) ended with exception, in "+Tasks.current()+": "+exception, exception); + } + if (exitCodeOfCopy!=0) { + exception = new IllegalStateException(getSummary()+" (SSH put task) ended with exit code "+exitCodeOfCopy+", in "+Tasks.current()); + throw exception; + } + } + // not successful, but allowed + return null; + } + + // TODO verify + + successful = (exception==null && ((Integer)0).equals(exitCodeOfCopy)); + return null; + } + } + + @Override + public String toString() { + return super.toString()+"["+task+"]"; + } + + /** blocks, throwing if there was an exception */ + public Void get() { + return getTask().getUnchecked(); + } + + /** returns the exit code from the copy, 0 on success; + * null if it has not completed or threw exception + * (not sure if this is ever a non-zero integer or if it is meaningful) + * <p> + * most callers will want the simpler {@link #isSuccessful()} */ + public Integer getExitCode() { + return exitCodeOfCopy; + } + + /** returns any exception encountered in the operation */ + public Exception getException() { + return exception; + } + + /** blocks until the task completes; does not throw */ + public SshPutTaskWrapper block() { + getTask().blockUntilEnded(); + return this; + } + + /** true iff the ssh job has completed (with or without failure) */ + public boolean isDone() { + return getTask().isDone(); + } + + /** true iff the scp has completed successfully; guaranteed to be set before {@link #isDone()} or {@link #block()} are satisfied */ + public boolean isSuccessful() { + return successful; + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshTasks.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshTasks.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshTasks.java new file mode 100644 index 0000000..5f8d735 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/SshTasks.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task.ssh; + +import java.util.Map; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.management.ManagementContext; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.TaskAdaptable; +import org.apache.brooklyn.api.management.TaskFactory; +import org.apache.brooklyn.api.management.TaskQueueingContext; +import org.apache.brooklyn.core.util.ResourceUtils; +import org.apache.brooklyn.core.util.config.ConfigBag; +import org.apache.brooklyn.core.util.internal.ssh.SshTool; +import org.apache.brooklyn.core.util.task.DynamicTasks; +import org.apache.brooklyn.core.util.task.Tasks; +import org.apache.brooklyn.core.util.task.ssh.internal.PlainSshExecTaskFactory; +import org.apache.brooklyn.core.util.task.system.ProcessTaskFactory; +import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.ConfigKey; +import brooklyn.config.ConfigUtils; +import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.entity.basic.ConfigKeys; + +import org.apache.brooklyn.location.basic.AbstractLocation; +import org.apache.brooklyn.location.basic.LocationInternal; +import org.apache.brooklyn.location.basic.SshMachineLocation; + +import brooklyn.util.net.Urls; +import brooklyn.util.ssh.BashCommands; +import brooklyn.util.stream.Streams; +import brooklyn.util.text.Identifiers; +import brooklyn.util.text.Strings; + +import com.google.common.annotations.Beta; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; + +/** + * Conveniences for generating {@link Task} instances to perform SSH activities on an {@link SshMachineLocation}. + * <p> + * To infer the {@link SshMachineLocation} and take properties from entities and global management context the + * {@link SshEffectorTasks} should be preferred over this class. + * + * @see SshEffectorTasks + * @since 0.6.0 + */ +@Beta +public class SshTasks { + + private static final Logger log = LoggerFactory.getLogger(SshTasks.class); + + public static ProcessTaskFactory<Integer> newSshExecTaskFactory(SshMachineLocation machine, String ...commands) { + return newSshExecTaskFactory(machine, true, commands); + } + + public static ProcessTaskFactory<Integer> newSshExecTaskFactory(SshMachineLocation machine, final boolean useMachineConfig, String ...commands) { + return new PlainSshExecTaskFactory<Integer>(machine, commands) { + { + if (useMachineConfig) + config.putIfAbsent(getSshFlags(machine)); + } + }; + } + + public static SshPutTaskFactory newSshPutTaskFactory(SshMachineLocation machine, String remoteFile) { + return newSshPutTaskFactory(machine, true, remoteFile); + } + + public static SshPutTaskFactory newSshPutTaskFactory(SshMachineLocation machine, final boolean useMachineConfig, String remoteFile) { + return new SshPutTaskFactory(machine, remoteFile) { + { + if (useMachineConfig) + config.putIfAbsent(getSshFlags(machine)); + } + }; + } + + public static SshFetchTaskFactory newSshFetchTaskFactory(SshMachineLocation machine, String remoteFile) { + return newSshFetchTaskFactory(machine, true, remoteFile); + } + + public static SshFetchTaskFactory newSshFetchTaskFactory(SshMachineLocation machine, final boolean useMachineConfig, String remoteFile) { + return new SshFetchTaskFactory(machine, remoteFile) { + { + if (useMachineConfig) + config.putIfAbsent(getSshFlags(machine)); + } + }; + } + + private static Map<String, Object> getSshFlags(Location location) { + ConfigBag allConfig = ConfigBag.newInstance(); + + if (location instanceof AbstractLocation) { + ManagementContext mgmt = ((AbstractLocation)location).getManagementContext(); + if (mgmt!=null) + allConfig.putAll(mgmt.getConfig().getAllConfig()); + } + + allConfig.putAll(((LocationInternal)location).config().getBag()); + + Map<String, Object> result = Maps.newLinkedHashMap(); + for (String keyS : allConfig.getAllConfig().keySet()) { + ConfigKey<?> key = ConfigKeys.newConfigKey(Object.class, keyS); + if (key.getName().startsWith(SshTool.BROOKLYN_CONFIG_KEY_PREFIX)) { + result.put(ConfigUtils.unprefixedKey(SshTool.BROOKLYN_CONFIG_KEY_PREFIX, key).getName(), allConfig.get(key)); + } + } + return result; + } + + @Beta + public static enum OnFailingTask { + FAIL, + /** issues a warning, sometimes implemented as marking the task inessential and failing it if it appears + * we are in a dynamic {@link TaskQueueingContext}; + * useful because this way the warning appears to the user; + * but note that the check is done against the calling thread so use with some care + * (and thus this enum is currently here rather then elsewhere) */ + WARN_OR_IF_DYNAMIC_FAIL_MARKING_INESSENTIAL, + /** issues a warning in the log if the task fails, otherwise swallows it */ + WARN_IN_LOG_ONLY, + /** not even a warning if the task fails (the caller is expected to handle it as appropriate) */ + IGNORE } + + public static ProcessTaskFactory<Boolean> dontRequireTtyForSudo(SshMachineLocation machine, final boolean failIfCantSudo) { + return dontRequireTtyForSudo(machine, failIfCantSudo ? OnFailingTask.FAIL : OnFailingTask.WARN_IN_LOG_ONLY); + } + /** creates a task which returns modifies sudoers to ensure non-tty access is permitted; + * also gives nice warnings if sudo is not permitted */ + public static ProcessTaskFactory<Boolean> dontRequireTtyForSudo(SshMachineLocation machine, OnFailingTask onFailingTaskRequested) { + final OnFailingTask onFailingTask; + if (onFailingTaskRequested==OnFailingTask.WARN_OR_IF_DYNAMIC_FAIL_MARKING_INESSENTIAL) { + if (DynamicTasks.getTaskQueuingContext()!=null) + onFailingTask = onFailingTaskRequested; + else + onFailingTask = OnFailingTask.WARN_IN_LOG_ONLY; + } else { + onFailingTask = onFailingTaskRequested; + } + + final String id = Identifiers.makeRandomId(6); + return newSshExecTaskFactory(machine, + BashCommands.dontRequireTtyForSudo(), + // strange quotes are to ensure we don't match against echoed stdin + BashCommands.sudo("echo \"sudo\"-is-working-"+id)) + .summary("setting up sudo") + .configure(SshTool.PROP_ALLOCATE_PTY, true) + .allowingNonZeroExitCode() + .returning(new Function<ProcessTaskWrapper<?>,Boolean>() { public Boolean apply(ProcessTaskWrapper<?> task) { + if (task.getExitCode()==0 && task.getStdout().contains("sudo-is-working-"+id)) return true; + Entity entity = BrooklynTaskTags.getTargetOrContextEntity(Tasks.current()); + + + if (onFailingTask!=OnFailingTask.IGNORE) { + // TODO if in a queueing context can we mark this task inessential and throw? + // that way user sees the message... + String message = "Error setting up sudo for "+task.getMachine().getUser()+"@"+task.getMachine().getAddress().getHostName()+" "+ + " (exit code "+task.getExitCode()+(entity!=null ? ", entity "+entity : "")+")"; + DynamicTasks.queueIfPossible(Tasks.warning(message, null)); + } + Streams.logStreamTail(log, "STDERR of sudo setup problem", Streams.byteArrayOfString(task.getStderr()), 1024); + + if (onFailingTask==OnFailingTask.WARN_OR_IF_DYNAMIC_FAIL_MARKING_INESSENTIAL) { + Tasks.markInessential(); + } + if (onFailingTask==OnFailingTask.FAIL || onFailingTask==OnFailingTask.WARN_OR_IF_DYNAMIC_FAIL_MARKING_INESSENTIAL) { + throw new IllegalStateException("Passwordless sudo is required for "+task.getMachine().getUser()+"@"+task.getMachine().getAddress().getHostName()+ + (entity!=null ? " ("+entity+")" : "")); + } + return false; + } }); + } + + /** Function for use in {@link ProcessTaskFactory#returning(Function)} which logs all information, optionally requires zero exit code, + * and then returns stdout */ + public static Function<ProcessTaskWrapper<?>, String> returningStdoutLoggingInfo(final Logger logger, final boolean requireZero) { + return new Function<ProcessTaskWrapper<?>, String>() { + public String apply(@Nullable ProcessTaskWrapper<?> input) { + if (logger!=null) logger.info(input+" COMMANDS:\n"+Strings.join(input.getCommands(),"\n")); + if (logger!=null) logger.info(input+" STDOUT:\n"+input.getStdout()); + if (logger!=null) logger.info(input+" STDERR:\n"+input.getStderr()); + if (requireZero && input.getExitCode()!=0) + throw new IllegalStateException("non-zero exit code in "+input.getSummary()+": see log for more details!"); + return input.getStdout(); + } + }; + } + + /** task to install a file given a url, where the url is resolved remotely first then locally */ + public static TaskFactory<?> installFromUrl(final SshMachineLocation location, final String url, final String destPath) { + return installFromUrl(ResourceUtils.create(SshTasks.class), ImmutableMap.<String,Object>of(), location, url, destPath); + } + /** task to install a file given a url, where the url is resolved remotely first then locally */ + public static TaskFactory<?> installFromUrl(final ResourceUtils utils, final Map<String, ?> props, final SshMachineLocation location, final String url, final String destPath) { + return new TaskFactory<TaskAdaptable<?>>() { + @Override + public TaskAdaptable<?> newTask() { + return Tasks.<Void>builder().name("installing "+Urls.getBasename(url)).description("installing "+url+" to "+destPath).body(new Runnable() { + @Override + public void run() { + int result = location.installTo(utils, props, url, destPath); + if (result!=0) + throw new IllegalStateException("Failed to install '"+url+"' to '"+destPath+"' at "+location+": exit code "+result); + } + }).build(); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/AbstractSshExecTaskFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/AbstractSshExecTaskFactory.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/AbstractSshExecTaskFactory.java new file mode 100644 index 0000000..45600d5 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/AbstractSshExecTaskFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task.ssh.internal; + +import com.google.common.base.Preconditions; + +import org.apache.brooklyn.core.util.config.ConfigBag; +import org.apache.brooklyn.core.util.task.system.ProcessTaskFactory; +import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper; +import org.apache.brooklyn.core.util.task.system.internal.AbstractProcessTaskFactory; +import org.apache.brooklyn.location.basic.SshMachineLocation; + +// cannot be (cleanly) instantiated due to nested generic self-referential type; however trivial subclasses do allow it +public abstract class AbstractSshExecTaskFactory<T extends AbstractProcessTaskFactory<T,RET>,RET> extends AbstractProcessTaskFactory<T,RET> implements ProcessTaskFactory<RET> { + + /** constructor where machine will be added later */ + public AbstractSshExecTaskFactory(String ...commands) { + super(commands); + } + + /** convenience constructor to supply machine immediately */ + public AbstractSshExecTaskFactory(SshMachineLocation machine, String ...commands) { + this(commands); + machine(machine); + } + + @Override + public ProcessTaskWrapper<RET> newTask() { + dirty = false; + return new ProcessTaskWrapper<RET>(this) { + protected void run(ConfigBag config) { + Preconditions.checkNotNull(getMachine(), "machine"); + if (Boolean.FALSE.equals(this.runAsScript)) { + this.exitCode = getMachine().execCommands(config.getAllConfig(), getSummary(), commands, shellEnvironment); + } else { // runScript = null or TRUE + this.exitCode = getMachine().execScript(config.getAllConfig(), getSummary(), commands, shellEnvironment); + } + } + protected String taskTypeShortName() { return "SSH"; } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/PlainSshExecTaskFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/PlainSshExecTaskFactory.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/PlainSshExecTaskFactory.java new file mode 100644 index 0000000..4d5dfce --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ssh/internal/PlainSshExecTaskFactory.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task.ssh.internal; + +import java.util.List; + +import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper; +import org.apache.brooklyn.location.basic.SshMachineLocation; + +import com.google.common.base.Function; + +/** the "Plain" class exists purely so we can massage return types for callers' convenience */ +public class PlainSshExecTaskFactory<RET> extends AbstractSshExecTaskFactory<PlainSshExecTaskFactory<RET>,RET> { + /** constructor where machine will be added later */ + public PlainSshExecTaskFactory(String ...commands) { + super(commands); + } + + /** convenience constructor to supply machine immediately */ + public PlainSshExecTaskFactory(SshMachineLocation machine, String ...commands) { + this(commands); + machine(machine); + } + + /** Constructor where machine will be added later */ + public PlainSshExecTaskFactory(List<String> commands) { + this(commands.toArray(new String[commands.size()])); + } + + /** Convenience constructor to supply machine immediately */ + public PlainSshExecTaskFactory(SshMachineLocation machine, List<String> commands) { + this(machine, commands.toArray(new String[commands.size()])); + } + + @Override + public <T2> PlainSshExecTaskFactory<T2> returning(ScriptReturnType type) { + return (PlainSshExecTaskFactory<T2>) super.<T2>returning(type); + } + + @Override + public <RET2> PlainSshExecTaskFactory<RET2> returning(Function<ProcessTaskWrapper<?>, RET2> resultTransformation) { + return (PlainSshExecTaskFactory<RET2>) super.returning(resultTransformation); + } + + @Override + public PlainSshExecTaskFactory<Boolean> returningIsExitCodeZero() { + return (PlainSshExecTaskFactory<Boolean>) super.returningIsExitCodeZero(); + } + + @Override + public PlainSshExecTaskFactory<String> requiringZeroAndReturningStdout() { + return (PlainSshExecTaskFactory<String>) super.requiringZeroAndReturningStdout(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskFactory.java b/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskFactory.java new file mode 100644 index 0000000..f66e1ea --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskFactory.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task.system; + +import java.util.Map; + +import org.apache.brooklyn.api.management.TaskFactory; +import org.apache.brooklyn.core.util.internal.ssh.SshTool; +import org.apache.brooklyn.core.util.task.system.ProcessTaskStub.ScriptReturnType; + +import brooklyn.config.ConfigKey; + +import org.apache.brooklyn.location.basic.SshMachineLocation; + +import com.google.common.annotations.Beta; +import com.google.common.base.Function; + +public interface ProcessTaskFactory<T> extends TaskFactory<ProcessTaskWrapper<T>> { + public ProcessTaskFactory<T> machine(SshMachineLocation machine); + public ProcessTaskFactory<T> add(String ...commandsToAdd); + public ProcessTaskFactory<T> add(Iterable<String> commandsToAdd); + public ProcessTaskFactory<T> requiringExitCodeZero(); + public ProcessTaskFactory<T> requiringExitCodeZero(String extraErrorMessage); + public ProcessTaskFactory<T> allowingNonZeroExitCode(); + public ProcessTaskFactory<String> requiringZeroAndReturningStdout(); + public ProcessTaskFactory<Boolean> returningIsExitCodeZero(); + public <RET2> ProcessTaskFactory<RET2> returning(ScriptReturnType type); + public <RET2> ProcessTaskFactory<RET2> returning(Function<ProcessTaskWrapper<?>, RET2> resultTransformation); + public ProcessTaskFactory<T> runAsCommand(); + public ProcessTaskFactory<T> runAsScript(); + public ProcessTaskFactory<T> runAsRoot(); + public ProcessTaskFactory<T> environmentVariable(String key, String val); + public ProcessTaskFactory<T> environmentVariables(Map<String,String> vars); + public ProcessTaskFactory<T> summary(String summary); + + /** allows setting config-key based properties for specific underlying tools */ + @Beta + public <V> ProcessTaskFactory<T> configure(ConfigKey<V> key, V value); + + /** allows setting config-key/flag based properties for specific underlying tools; + * but note that if any are prefixed with {@link SshTool#BROOKLYN_CONFIG_KEY_PREFIX} + * these should normally be filtered out */ + @Beta + public ProcessTaskFactory<T> configure(Map<?,?> flags); + + /** adds a listener which will be notified of (otherwise) successful completion, + * typically used to invalidate the result (ie throw exception, to promote a string in the output to an exception); + * invoked even if return code is zero, so a better error can be thrown */ + public ProcessTaskFactory<T> addCompletionListener(Function<ProcessTaskWrapper<?>, Void> function); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskStub.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskStub.java b/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskStub.java new file mode 100644 index 0000000..1937d15 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskStub.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task.system; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.brooklyn.core.util.config.ConfigBag; +import org.apache.brooklyn.location.basic.SshMachineLocation; + +import brooklyn.util.collections.MutableMap; +import brooklyn.util.text.Strings; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class ProcessTaskStub { + + protected final List<String> commands = new ArrayList<String>(); + /** null for localhost */ + protected SshMachineLocation machine; + + // config data + protected String summary; + protected final ConfigBag config = ConfigBag.newInstance(); + + public static enum ScriptReturnType { CUSTOM, EXIT_CODE, STDOUT_STRING, STDOUT_BYTES, STDERR_STRING, STDERR_BYTES } + protected Function<ProcessTaskWrapper<?>, ?> returnResultTransformation = null; + protected ScriptReturnType returnType = ScriptReturnType.EXIT_CODE; + + protected Boolean runAsScript = null; + protected boolean runAsRoot = false; + protected Boolean requireExitCodeZero = null; + protected String extraErrorMessage = null; + protected Map<String,String> shellEnvironment = new MutableMap<String, String>(); + protected final List<Function<ProcessTaskWrapper<?>, Void>> completionListeners = new ArrayList<Function<ProcessTaskWrapper<?>,Void>>(); + + public ProcessTaskStub() {} + + protected ProcessTaskStub(ProcessTaskStub source) { + commands.addAll(source.getCommands()); + machine = source.getMachine(); + summary = source.getSummary(); + config.copy(source.getConfig()); + returnResultTransformation = source.returnResultTransformation; + returnType = source.returnType; + runAsScript = source.runAsScript; + runAsRoot = source.runAsRoot; + requireExitCodeZero = source.requireExitCodeZero; + extraErrorMessage = source.extraErrorMessage; + shellEnvironment.putAll(source.getShellEnvironment()); + completionListeners.addAll(source.getCompletionListeners()); + } + + public String getSummary() { + if (summary!=null) return summary; + return Strings.maxlen(Strings.join(commands, " ; "), 160); + } + + /** null for localhost */ + public SshMachineLocation getMachine() { + return machine; + } + + public Map<String, String> getShellEnvironment() { + return ImmutableMap.copyOf(shellEnvironment); + } + + @Override + public String toString() { + return super.toString()+"["+getSummary()+"]"; + } + + public List<String> getCommands() { + return ImmutableList.copyOf(commands); + } + + public List<Function<ProcessTaskWrapper<?>, Void>> getCompletionListeners() { + return completionListeners; + } + + protected ConfigBag getConfig() { return config; } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskWrapper.java b/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskWrapper.java new file mode 100644 index 0000000..045b3c9 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/system/ProcessTaskWrapper.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task.system; + +import java.io.ByteArrayOutputStream; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.TaskWrapper; +import org.apache.brooklyn.core.util.config.ConfigBag; +import org.apache.brooklyn.core.util.internal.ssh.ShellTool; +import org.apache.brooklyn.core.util.task.TaskBuilder; +import org.apache.brooklyn.core.util.task.Tasks; +import org.apache.brooklyn.core.util.task.system.internal.AbstractProcessTaskFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.util.stream.Streams; +import brooklyn.util.text.Strings; + +import com.google.common.base.Function; + +/** Wraps a fully constructed process task, and allows callers to inspect status. + * Note that methods in here such as {@link #getStdout()} will return partially completed streams while the task is ongoing + * (and exit code will be null). You can {@link #block()} or {@link #get()} as conveniences on the underlying {@link #getTask()}. */ +public abstract class ProcessTaskWrapper<RET> extends ProcessTaskStub implements TaskWrapper<RET> { + + private static final Logger log = LoggerFactory.getLogger(ProcessTaskWrapper.class); + + private final Task<RET> task; + + // execution details + protected ByteArrayOutputStream stdout = new ByteArrayOutputStream(); + protected ByteArrayOutputStream stderr = new ByteArrayOutputStream(); + protected Integer exitCode = null; + + @SuppressWarnings("unchecked") + protected ProcessTaskWrapper(AbstractProcessTaskFactory<?,RET> constructor) { + super(constructor); + TaskBuilder<Object> tb = constructor.constructCustomizedTaskBuilder(); + if (stdout!=null) tb.tag(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDOUT, stdout)); + if (stderr!=null) tb.tag(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDERR, stderr)); + task = (Task<RET>) tb.body(new ProcessTaskInternalJob()).build(); + } + + @Override + public Task<RET> asTask() { + return getTask(); + } + + @Override + public Task<RET> getTask() { + return task; + } + + public Integer getExitCode() { + return exitCode; + } + + public byte[] getStdoutBytes() { + if (stdout==null) return null; + return stdout.toByteArray(); + } + + public byte[] getStderrBytes() { + if (stderr==null) return null; + return stderr.toByteArray(); + } + + public String getStdout() { + if (stdout==null) return null; + return stdout.toString(); + } + + public String getStderr() { + if (stderr==null) return null; + return stderr.toString(); + } + + protected class ProcessTaskInternalJob implements Callable<Object> { + @Override + public Object call() throws Exception { + run( getConfigForRunning() ); + + for (Function<ProcessTaskWrapper<?>, Void> listener: completionListeners) { + try { + listener.apply(ProcessTaskWrapper.this); + } catch (Exception e) { + logWithDetailsAndThrow("Error in "+taskTypeShortName()+" task "+getSummary()+": "+e, e); + } + } + + if (exitCode!=0 && !Boolean.FALSE.equals(requireExitCodeZero)) { + if (Boolean.TRUE.equals(requireExitCodeZero)) { + logWithDetailsAndThrow(taskTypeShortName()+" task ended with exit code "+exitCode+" when 0 was required, in "+Tasks.current()+": "+getSummary(), null); + } else { + // warn, but allow, on non-zero not explicitly allowed + log.warn(taskTypeShortName()+" task ended with exit code "+exitCode+" when non-zero was not explicitly allowed (error may be thrown in future), in " + +Tasks.current()+": "+getSummary()); + } + } + switch (returnType) { + case CUSTOM: return returnResultTransformation.apply(ProcessTaskWrapper.this); + case STDOUT_STRING: return stdout.toString(); + case STDOUT_BYTES: return stdout.toByteArray(); + case STDERR_STRING: return stderr.toString(); + case STDERR_BYTES: return stderr.toByteArray(); + case EXIT_CODE: return exitCode; + } + + throw new IllegalStateException("Unknown return type for "+taskTypeShortName()+" job "+getSummary()+": "+returnType); + } + + protected void logWithDetailsAndThrow(String message, Throwable optionalCause) { + message = (extraErrorMessage!=null ? extraErrorMessage+": " : "") + message; + log.warn(message+" (throwing)"); + logProblemDetails("STDERR", stderr, 1024); + logProblemDetails("STDOUT", stdout, 1024); + logProblemDetails("STDIN", Streams.byteArrayOfString(Strings.join(commands,"\n")), 4096); + if (optionalCause!=null) throw new IllegalStateException(message, optionalCause); + throw new IllegalStateException(message); + } + + protected void logProblemDetails(String streamName, ByteArrayOutputStream stream, int max) { + Streams.logStreamTail(log, streamName+" for problem in "+Tasks.current(), stream, max); + } + + } + + @Override + public String toString() { + return super.toString()+"["+task+"]"; + } + + /** blocks and gets the result, throwing if there was an exception */ + public RET get() { + return getTask().getUnchecked(); + } + + /** blocks until the task completes; does not throw */ + public ProcessTaskWrapper<RET> block() { + getTask().blockUntilEnded(); + return this; + } + + /** true iff the process has completed (with or without failure) */ + public boolean isDone() { + return getTask().isDone(); + } + + /** for overriding */ + protected ConfigBag getConfigForRunning() { + ConfigBag config = ConfigBag.newInstanceCopying(ProcessTaskWrapper.this.config); + if (stdout!=null) config.put(ShellTool.PROP_OUT_STREAM, stdout); + if (stderr!=null) config.put(ShellTool.PROP_ERR_STREAM, stderr); + + if (!config.containsKey(ShellTool.PROP_NO_EXTRA_OUTPUT)) + // by default no extra output (so things like cat, etc work as expected) + config.put(ShellTool.PROP_NO_EXTRA_OUTPUT, true); + + if (runAsRoot) + config.put(ShellTool.PROP_RUN_AS_ROOT, true); + return config; + } + + protected abstract void run(ConfigBag config); + + protected abstract String taskTypeShortName(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/system/SystemTasks.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/system/SystemTasks.java b/core/src/main/java/org/apache/brooklyn/core/util/task/system/SystemTasks.java new file mode 100644 index 0000000..d05b09c --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/system/SystemTasks.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task.system; + +import org.apache.brooklyn.core.util.task.system.internal.SystemProcessTaskFactory.ConcreteSystemProcessTaskFactory; + +public class SystemTasks { + + public static ProcessTaskFactory<Integer> exec(String ...commands) { + return new ConcreteSystemProcessTaskFactory<Integer>(commands); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/task/system/internal/AbstractProcessTaskFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/system/internal/AbstractProcessTaskFactory.java b/core/src/main/java/org/apache/brooklyn/core/util/task/system/internal/AbstractProcessTaskFactory.java new file mode 100644 index 0000000..8b90263 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/system/internal/AbstractProcessTaskFactory.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task.system.internal; + +import java.util.Arrays; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.BrooklynTaskTags; + +import org.apache.brooklyn.core.util.task.TaskBuilder; +import org.apache.brooklyn.core.util.task.system.ProcessTaskFactory; +import org.apache.brooklyn.core.util.task.system.ProcessTaskStub; +import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper; +import org.apache.brooklyn.location.basic.SshMachineLocation; + +import brooklyn.util.stream.Streams; +import brooklyn.util.text.Strings; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + +public abstract class AbstractProcessTaskFactory<T extends AbstractProcessTaskFactory<T,RET>,RET> extends ProcessTaskStub implements ProcessTaskFactory<RET> { + + private static final Logger log = LoggerFactory.getLogger(AbstractProcessTaskFactory.class); + + protected boolean dirty = false; + + public AbstractProcessTaskFactory(String ...commands) { + this.commands.addAll(Arrays.asList(commands)); + } + + @SuppressWarnings("unchecked") + protected T self() { return (T)this; } + + protected void markDirty() { + dirty = true; + } + + @Override + public T add(String ...commandsToAdd) { + markDirty(); + for (String commandToAdd: commandsToAdd) this.commands.add(commandToAdd); + return self(); + } + + @Override + public T add(Iterable<String> commandsToAdd) { + Iterables.addAll(this.commands, commandsToAdd); + return self(); + } + + @Override + public T machine(SshMachineLocation machine) { + markDirty(); + this.machine = machine; + return self(); + } + + @Override + public T requiringExitCodeZero() { + markDirty(); + requireExitCodeZero = true; + return self(); + } + + @Override + public T requiringExitCodeZero(String extraErrorMessage) { + markDirty(); + requireExitCodeZero = true; + this.extraErrorMessage = extraErrorMessage; + return self(); + } + + @Override + public T allowingNonZeroExitCode() { + markDirty(); + requireExitCodeZero = false; + return self(); + } + + @Override + public ProcessTaskFactory<Boolean> returningIsExitCodeZero() { + if (requireExitCodeZero==null) allowingNonZeroExitCode(); + return returning(new Function<ProcessTaskWrapper<?>,Boolean>() { + public Boolean apply(ProcessTaskWrapper<?> input) { + return input.getExitCode()==0; + } + }); + } + + @Override + public ProcessTaskFactory<String> requiringZeroAndReturningStdout() { + requiringExitCodeZero(); + return this.<String>returning(ScriptReturnType.STDOUT_STRING); + } + + @Override + @SuppressWarnings("unchecked") + public <RET2> ProcessTaskFactory<RET2> returning(ScriptReturnType type) { + markDirty(); + returnType = Preconditions.checkNotNull(type); + return (ProcessTaskFactory<RET2>) self(); + } + + @Override + @SuppressWarnings("unchecked") + public <RET2> ProcessTaskFactory<RET2> returning(Function<ProcessTaskWrapper<?>, RET2> resultTransformation) { + markDirty(); + returnType = ScriptReturnType.CUSTOM; + this.returnResultTransformation = resultTransformation; + return (ProcessTaskFactory<RET2>) self(); + } + + @Override + public T runAsCommand() { + markDirty(); + runAsScript = false; + return self(); + } + + @Override + public T runAsScript() { + markDirty(); + runAsScript = true; + return self(); + } + + @Override + public T runAsRoot() { + markDirty(); + runAsRoot = true; + return self(); + } + + @Override + public T environmentVariable(String key, String val) { + markDirty(); + shellEnvironment.put(key, val); + return self(); + } + + @Override + public T environmentVariables(Map<String,String> vars) { + if (vars!=null) { + markDirty(); + shellEnvironment.putAll(vars); + } + return self(); + } + + /** creates the TaskBuilder which can be further customized; typically invoked by the initial {@link #newTask()} */ + public TaskBuilder<Object> constructCustomizedTaskBuilder() { + TaskBuilder<Object> tb = TaskBuilder.builder().dynamic(false).name("ssh: "+getSummary()); + + tb.tag(BrooklynTaskTags.tagForStream(BrooklynTaskTags.STREAM_STDIN, + Streams.byteArrayOfString(Strings.join(commands, "\n")))); + tb.tag(BrooklynTaskTags.tagForEnvStream(BrooklynTaskTags.STREAM_ENV, shellEnvironment)); + + return tb; + } + + @Override + public T summary(String summary) { + markDirty(); + this.summary = summary; + return self(); + } + + @Override + public <V> T configure(ConfigKey<V> key, V value) { + config.configure(key, value); + return self(); + } + + @Override + public T configure(Map<?, ?> flags) { + if (flags!=null) + config.putAll(flags); + return self(); + } + + @Override + public T addCompletionListener(Function<ProcessTaskWrapper<?>, Void> listener) { + completionListeners.add(listener); + return self(); + } + + @Override + protected void finalize() throws Throwable { + // help let people know of API usage error + if (dirty) + log.warn("Task "+this+" was modified but modification was never used"); + super.finalize(); + } +}
