http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/software/MachineLifecycleEffectorTasks.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/software/MachineLifecycleEffectorTasks.java b/software/base/src/main/java/brooklyn/entity/software/MachineLifecycleEffectorTasks.java deleted file mode 100644 index 927816d..0000000 --- a/software/base/src/main/java/brooklyn/entity/software/MachineLifecycleEffectorTasks.java +++ /dev/null @@ -1,949 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.software; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; - -import javax.annotation.Nullable; - -import org.apache.brooklyn.api.effector.Effector; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.location.MachineLocation; -import org.apache.brooklyn.api.location.MachineManagementMixins; -import org.apache.brooklyn.api.location.MachineProvisioningLocation; -import org.apache.brooklyn.api.location.NoMachinesAvailableException; -import org.apache.brooklyn.api.location.MachineManagementMixins.SuspendsMachines; -import org.apache.brooklyn.api.mgmt.Task; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.config.Sanitizer; -import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; -import org.apache.brooklyn.effector.core.EffectorBody; -import org.apache.brooklyn.effector.core.Effectors; -import org.apache.brooklyn.entity.core.Attributes; -import org.apache.brooklyn.entity.core.BrooklynConfigKeys; -import org.apache.brooklyn.entity.core.Entities; -import org.apache.brooklyn.entity.core.EntityInternal; -import org.apache.brooklyn.entity.lifecycle.Lifecycle; -import org.apache.brooklyn.entity.lifecycle.ServiceStateLogic; -import org.apache.brooklyn.entity.stock.EffectorStartableImpl.StartParameters; -import org.apache.brooklyn.entity.trait.Startable; -import org.apache.brooklyn.entity.trait.StartableMethods; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.Beta; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; - -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.basic.SoftwareProcess.RestartSoftwareParameters; -import brooklyn.entity.basic.SoftwareProcess.RestartSoftwareParameters.RestartMachineMode; -import brooklyn.entity.basic.SoftwareProcess.StopSoftwareParameters; -import brooklyn.entity.basic.SoftwareProcess.StopSoftwareParameters.StopMode; - -import org.apache.brooklyn.location.basic.AbstractLocation; -import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation; -import org.apache.brooklyn.location.basic.Locations; -import org.apache.brooklyn.location.basic.Machines; -import org.apache.brooklyn.location.basic.SshMachineLocation; -import org.apache.brooklyn.location.cloud.CloudLocationConfig; -import org.apache.brooklyn.sensor.feed.ConfigToAttributes; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.core.config.ConfigBag; -import org.apache.brooklyn.util.core.task.DynamicTasks; -import org.apache.brooklyn.util.core.task.Tasks; -import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; -import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.guava.Maybe; -import org.apache.brooklyn.util.net.UserAndHostAndPort; -import org.apache.brooklyn.util.os.Os; -import org.apache.brooklyn.util.ssh.BashCommands; -import org.apache.brooklyn.util.text.Strings; -import org.apache.brooklyn.util.time.Duration; - -/** - * Default skeleton for start/stop/restart tasks on machines. - * <p> - * Knows how to provision machines, making use of {@link ProvidesProvisioningFlags#obtainProvisioningFlags(MachineProvisioningLocation)}, - * and provides hooks for injecting behaviour at common places. - * <p> - * Methods are designed for overriding, with the convention that *Async methods should queue (and not block). - * The following methods are commonly overridden (and you can safely queue tasks, block, or return immediately in them): - * <ul> - * <li> {@link #startProcessesAtMachine(Supplier)} (required) - * <li> {@link #stopProcessesAtMachine()} (required, but can be left blank if you assume the VM will be destroyed) - * <li> {@link #preStartCustom(MachineLocation)} - * <li> {@link #postStartCustom()} - * <li> {@link #preStopCustom()} - * <li> {@link #postStopCustom()} - * </ul> - * Note methods at this level typically look after the {@link Attributes#SERVICE_STATE} sensor. - * - * @since 0.6.0 - */ -@Beta -public abstract class MachineLifecycleEffectorTasks { - - private static final Logger log = LoggerFactory.getLogger(MachineLifecycleEffectorTasks.class); - - public static final ConfigKey<Boolean> ON_BOX_BASE_DIR_RESOLVED = ConfigKeys.newBooleanConfigKey("onbox.base.dir.resolved", - "Whether the on-box base directory has been resolved (for internal use)"); - - public static final ConfigKey<Collection<? extends Location>> LOCATIONS = StartParameters.LOCATIONS; - public static final ConfigKey<Duration> STOP_PROCESS_TIMEOUT = ConfigKeys.newConfigKey(Duration.class, - "process.stop.timeout", "How long to wait for the processes to be stopped; use null to mean forever", Duration.TWO_MINUTES); - - protected final MachineInitTasks machineInitTasks = new MachineInitTasks(); - - /** Attaches lifecycle effectors (start, restart, stop) to the given entity post-creation. */ - public void attachLifecycleEffectors(Entity entity) { - ((EntityInternal) entity).getMutableEntityType().addEffector(newStartEffector()); - ((EntityInternal) entity).getMutableEntityType().addEffector(newRestartEffector()); - ((EntityInternal) entity).getMutableEntityType().addEffector(newStopEffector()); - } - - /** - * Return an effector suitable for setting in a {@code public static final} or attaching dynamically. - * <p> - * The effector overrides the corresponding effector from {@link Startable} with - * the behaviour in this lifecycle class instance. - */ - public Effector<Void> newStartEffector() { - return Effectors.effector(Startable.START).impl(newStartEffectorTask()).build(); - } - - /** @see {@link #newStartEffector()} */ - public Effector<Void> newRestartEffector() { - return Effectors.effector(Startable.RESTART) - .parameter(RestartSoftwareParameters.RESTART_CHILDREN) - .parameter(RestartSoftwareParameters.RESTART_MACHINE) - .impl(newRestartEffectorTask()) - .build(); - } - - /** @see {@link #newStartEffector()} */ - public Effector<Void> newStopEffector() { - return Effectors.effector(Startable.STOP) - .parameter(StopSoftwareParameters.STOP_PROCESS_MODE) - .parameter(StopSoftwareParameters.STOP_MACHINE_MODE) - .impl(newStopEffectorTask()) - .build(); - } - - /** @see {@link #newStartEffector()} */ - public Effector<Void> newSuspendEffector() { - return Effectors.effector(Void.class, "suspend") - .description("Suspend the process/service represented by an entity") - .parameter(StopSoftwareParameters.STOP_PROCESS_MODE) - .parameter(StopSoftwareParameters.STOP_MACHINE_MODE) - .impl(newSuspendEffectorTask()) - .build(); - } - - /** - * Returns the {@link EffectorBody} which supplies the implementation for the start effector. - * <p> - * Calls {@link #start(Collection)} in this class. - */ - public EffectorBody<Void> newStartEffectorTask() { - // TODO included anonymous inner class for backwards compatibility with persisted state. - new EffectorBody<Void>() { - @Override - public Void call(ConfigBag parameters) { - Collection<? extends Location> locations = null; - - Object locationsRaw = parameters.getStringKey(LOCATIONS.getName()); - locations = Locations.coerceToCollection(entity().getManagementContext(), locationsRaw); - - if (locations==null) { - // null/empty will mean to inherit from parent - locations = Collections.emptyList(); - } - - start(locations); - return null; - } - }; - return new StartEffectorBody(); - } - - private class StartEffectorBody extends EffectorBody<Void> { - @Override - public Void call(ConfigBag parameters) { - Collection<? extends Location> locations = null; - - Object locationsRaw = parameters.getStringKey(LOCATIONS.getName()); - locations = Locations.coerceToCollection(entity().getManagementContext(), locationsRaw); - - if (locations == null) { - // null/empty will mean to inherit from parent - locations = Collections.emptyList(); - } - - start(locations); - return null; - } - - } - - /** - * Calls {@link #restart(ConfigBag)}. - * - * @see {@link #newStartEffectorTask()} - */ - public EffectorBody<Void> newRestartEffectorTask() { - // TODO included anonymous inner class for backwards compatibility with persisted state. - new EffectorBody<Void>() { - @Override - public Void call(ConfigBag parameters) { - restart(parameters); - return null; - } - }; - return new RestartEffectorBody(); - } - - private class RestartEffectorBody extends EffectorBody<Void> { - @Override - public Void call(ConfigBag parameters) { - restart(parameters); - return null; - } - } - - /** - * Calls {@link #stop(ConfigBag)}. - * - * @see {@link #newStartEffectorTask()} - */ - public EffectorBody<Void> newStopEffectorTask() { - // TODO included anonymous inner class for backwards compatibility with persisted state. - new EffectorBody<Void>() { - @Override - public Void call(ConfigBag parameters) { - stop(parameters); - return null; - } - }; - return new StopEffectorBody(); - } - - private class StopEffectorBody extends EffectorBody<Void> { - @Override - public Void call(ConfigBag parameters) { - stop(parameters); - return null; - } - } - - /** - * Calls {@link #suspend(ConfigBag)}. - * - * @see {@link #newStartEffectorTask()} - */ - public EffectorBody<Void> newSuspendEffectorTask() { - return new SuspendEffectorBody(); - } - - private class SuspendEffectorBody extends EffectorBody<Void> { - @Override - public Void call(ConfigBag parameters) { - suspend(parameters); - return null; - } - } - - protected EntityInternal entity() { - return (EntityInternal) BrooklynTaskTags.getTargetOrContextEntity(Tasks.current()); - } - - protected Location getLocation(@Nullable Collection<? extends Location> locations) { - if (locations==null || locations.isEmpty()) locations = entity().getLocations(); - if (locations.isEmpty()) { - MachineProvisioningLocation<?> provisioner = entity().getAttribute(SoftwareProcess.PROVISIONING_LOCATION); - if (provisioner!=null) locations = Arrays.<Location>asList(provisioner); - } - locations = Locations.getLocationsCheckingAncestors(locations, entity()); - - Maybe<MachineLocation> ml = Locations.findUniqueMachineLocation(locations); - if (ml.isPresent()) return ml.get(); - - if (locations.isEmpty()) - throw new IllegalArgumentException("No locations specified when starting "+entity()); - if (locations.size() != 1 || Iterables.getOnlyElement(locations)==null) - throw new IllegalArgumentException("Ambiguous locations detected when starting "+entity()+": "+locations); - return Iterables.getOnlyElement(locations); - } - - /** runs the tasks needed to start, wrapped by setting {@link Attributes#SERVICE_STATE_EXPECTED} appropriately */ - public void start(Collection<? extends Location> locations) { - ServiceStateLogic.setExpectedState(entity(), Lifecycle.STARTING); - try { - startInLocations(locations); - DynamicTasks.waitForLast(); - ServiceStateLogic.setExpectedState(entity(), Lifecycle.RUNNING); - } catch (Throwable t) { - ServiceStateLogic.setExpectedState(entity(), Lifecycle.ON_FIRE); - throw Exceptions.propagate(t); - } - } - - /** Asserts there is a single location and calls {@link #startInLocation(Location)} with that location. */ - protected void startInLocations(Collection<? extends Location> locations) { - startInLocation(getLocation(locations)); - } - - /** Dispatches to the appropriate method(s) to start in the given location. */ - protected void startInLocation(final Location location) { - Supplier<MachineLocation> locationS = null; - if (location instanceof MachineProvisioningLocation) { - Task<MachineLocation> machineTask = provisionAsync((MachineProvisioningLocation<?>)location); - locationS = Tasks.supplier(machineTask); - } else if (location instanceof MachineLocation) { - locationS = Suppliers.ofInstance((MachineLocation)location); - } - Preconditions.checkState(locationS != null, "Unsupported location "+location+", when starting "+entity()); - - final Supplier<MachineLocation> locationSF = locationS; - preStartAtMachineAsync(locationSF); - DynamicTasks.queue("start (processes)", new StartProcessesAtMachineTask(locationSF)); - postStartAtMachineAsync(); - } - - private class StartProcessesAtMachineTask implements Runnable { - private final Supplier<MachineLocation> machineSupplier; - private StartProcessesAtMachineTask(Supplier<MachineLocation> machineSupplier) { - this.machineSupplier = machineSupplier; - } - @Override - public void run() { - startProcessesAtMachine(machineSupplier); - } - } - - /** - * Returns a queued {@link Task} which provisions a machine in the given location - * and returns that machine. The task can be used as a supplier to subsequent methods. - */ - protected Task<MachineLocation> provisionAsync(final MachineProvisioningLocation<?> location) { - return DynamicTasks.queue(Tasks.<MachineLocation>builder().name("provisioning (" + location.getDisplayName() + ")").body( - new ProvisionMachineTask(location)).build()); - } - - private class ProvisionMachineTask implements Callable<MachineLocation> { - final MachineProvisioningLocation<?> location; - - private ProvisionMachineTask(MachineProvisioningLocation<?> location) { - this.location = location; - } - - public MachineLocation call() throws Exception { - // Blocks if a latch was configured. - entity().getConfig(BrooklynConfigKeys.PROVISION_LATCH); - final Map<String, Object> flags = obtainProvisioningFlags(location); - if (!(location instanceof LocalhostMachineProvisioningLocation)) - log.info("Starting {}, obtaining a new location instance in {} with ports {}", new Object[]{entity(), location, flags.get("inboundPorts")}); - entity().setAttribute(SoftwareProcess.PROVISIONING_LOCATION, location); - MachineLocation machine; - try { - machine = Tasks.withBlockingDetails("Provisioning machine in " + location, new ObtainLocationTask(location, flags)); - if (machine == null) - throw new NoMachinesAvailableException("Failed to obtain machine in " + location.toString()); - } catch (Exception e) { - throw Exceptions.propagate(e); - } - - if (log.isDebugEnabled()) - log.debug("While starting {}, obtained new location instance {}", entity(), - (machine instanceof SshMachineLocation ? - machine + ", details " + ((SshMachineLocation) machine).getUser() + ":" + Sanitizer.sanitize(((SshMachineLocation) machine).config().getLocalBag()) - : machine)); - return machine; - } - } - - private static class ObtainLocationTask implements Callable<MachineLocation> { - final MachineProvisioningLocation<?> location; - final Map<String, Object> flags; - - private ObtainLocationTask(MachineProvisioningLocation<?> location, Map<String, Object> flags) { - this.flags = flags; - this.location = location; - } - - public MachineLocation call() throws NoMachinesAvailableException { - return location.obtain(flags); - } - } - - /** Wraps a call to {@link #preStartCustom(MachineLocation)}, after setting the hostname and address. */ - protected void preStartAtMachineAsync(final Supplier<MachineLocation> machineS) { - DynamicTasks.queue("pre-start", new PreStartTask(machineS.get())); - } - - private class PreStartTask implements Runnable { - final MachineLocation machine; - private PreStartTask(MachineLocation machine) { - this.machine = machine; - } - public void run() { - log.info("Starting {} on machine {}", entity(), machine); - Collection<Location> oldLocs = entity().getLocations(); - if (!oldLocs.isEmpty()) { - List<MachineLocation> oldSshLocs = ImmutableList.copyOf(Iterables.filter(oldLocs, MachineLocation.class)); - if (!oldSshLocs.isEmpty()) { - // check if existing locations are compatible - log.debug("Entity " + entity() + " had machine locations " + oldSshLocs + " when starting at " + machine + "; checking if they are compatible"); - for (MachineLocation oldLoc : oldSshLocs) { - // machines are deemed compatible if hostname and address are the same, or they are localhost - // this allows a machine create by jclouds to then be defined with an ip-based spec - if (!"localhost".equals(machine.getConfig(AbstractLocation.ORIGINAL_SPEC))) { - checkLocationParametersCompatible(machine, oldLoc, "hostname", - oldLoc.getAddress().getHostName(), machine.getAddress().getHostName()); - checkLocationParametersCompatible(machine, oldLoc, "address", - oldLoc.getAddress().getHostAddress(), machine.getAddress().getHostAddress()); - } - } - log.debug("Entity " + entity() + " old machine locations " + oldSshLocs + " were compatible, removing them to start at " + machine); - entity().removeLocations(oldSshLocs); - } - } - entity().addLocations(ImmutableList.of((Location) machine)); - - // elsewhere we rely on (public) hostname being set _after_ subnet_hostname - // (to prevent the tiny possibility of races resulting in hostname being returned - // simply because subnet is still being looked up) - Maybe<String> lh = Machines.getSubnetHostname(machine); - Maybe<String> la = Machines.getSubnetIp(machine); - if (lh.isPresent()) entity().setAttribute(Attributes.SUBNET_HOSTNAME, lh.get()); - if (la.isPresent()) entity().setAttribute(Attributes.SUBNET_ADDRESS, la.get()); - entity().setAttribute(Attributes.HOSTNAME, machine.getAddress().getHostName()); - entity().setAttribute(Attributes.ADDRESS, machine.getAddress().getHostAddress()); - if (machine instanceof SshMachineLocation) { - @SuppressWarnings("resource") - SshMachineLocation sshMachine = (SshMachineLocation) machine; - UserAndHostAndPort sshAddress = UserAndHostAndPort.fromParts(sshMachine.getUser(), sshMachine.getAddress().getHostName(), sshMachine.getPort()); - entity().setAttribute(Attributes.SSH_ADDRESS, sshAddress); - } - - if (Boolean.TRUE.equals(entity().getConfig(SoftwareProcess.OPEN_IPTABLES))) { - if (machine instanceof SshMachineLocation) { - Iterable<Integer> inboundPorts = (Iterable<Integer>) machine.config().get(CloudLocationConfig.INBOUND_PORTS); - machineInitTasks.openIptablesAsync(inboundPorts, (SshMachineLocation)machine); - } else { - log.warn("Ignoring flag OPEN_IPTABLES on non-ssh location {}", machine); - } - } - if (Boolean.TRUE.equals(entity().getConfig(SoftwareProcess.STOP_IPTABLES))) { - if (machine instanceof SshMachineLocation) { - machineInitTasks.stopIptablesAsync((SshMachineLocation)machine); - } else { - log.warn("Ignoring flag STOP_IPTABLES on non-ssh location {}", machine); - } - } - if (Boolean.TRUE.equals(entity().getConfig(SoftwareProcess.DONT_REQUIRE_TTY_FOR_SUDO))) { - if (machine instanceof SshMachineLocation) { - machineInitTasks.dontRequireTtyForSudoAsync((SshMachineLocation)machine); - } else { - log.warn("Ignoring flag DONT_REQUIRE_TTY_FOR_SUDO on non-ssh location {}", machine); - } - } - resolveOnBoxDir(entity(), machine); - preStartCustom(machine); - } - } - - /** - * Resolves the on-box dir. - * <p> - * Initialize and pre-create the right onbox working dir, if an ssh machine location. - * Logs a warning if not. - */ - @SuppressWarnings("deprecation") - public static String resolveOnBoxDir(EntityInternal entity, MachineLocation machine) { - String base = entity.getConfig(BrooklynConfigKeys.ONBOX_BASE_DIR); - if (base==null) base = machine.getConfig(BrooklynConfigKeys.ONBOX_BASE_DIR); - if (base!=null && Boolean.TRUE.equals(entity.getConfig(ON_BOX_BASE_DIR_RESOLVED))) return base; - if (base==null) base = entity.getManagementContext().getConfig().getConfig(BrooklynConfigKeys.ONBOX_BASE_DIR); - if (base==null) base = entity.getConfig(BrooklynConfigKeys.BROOKLYN_DATA_DIR); - if (base==null) base = machine.getConfig(BrooklynConfigKeys.BROOKLYN_DATA_DIR); - if (base==null) base = entity.getManagementContext().getConfig().getConfig(BrooklynConfigKeys.BROOKLYN_DATA_DIR); - if (base==null) base = "~/brooklyn-managed-processes"; - if (base.equals("~")) base="."; - if (base.startsWith("~/")) base = "."+base.substring(1); - - String resolvedBase = null; - if (entity.getConfig(BrooklynConfigKeys.SKIP_ON_BOX_BASE_DIR_RESOLUTION) || machine.getConfig(BrooklynConfigKeys.SKIP_ON_BOX_BASE_DIR_RESOLUTION)) { - if (log.isDebugEnabled()) log.debug("Skipping on-box base dir resolution for "+entity+" at "+machine); - if (!Os.isAbsolutish(base)) base = "~/"+base; - resolvedBase = Os.tidyPath(base); - } else if (machine instanceof SshMachineLocation) { - SshMachineLocation ms = (SshMachineLocation)machine; - ProcessTaskWrapper<Integer> baseTask = SshEffectorTasks.ssh( - BashCommands.alternatives("mkdir -p \"${BASE_DIR}\"", - BashCommands.chain( - BashCommands.sudo("mkdir -p \"${BASE_DIR}\""), - BashCommands.sudo("chown "+ms.getUser()+" \"${BASE_DIR}\""))), - "cd ~", - "cd ${BASE_DIR}", - "echo BASE_DIR_RESULT':'`pwd`:BASE_DIR_RESULT") - .environmentVariable("BASE_DIR", base) - .requiringExitCodeZero() - .summary("initializing on-box base dir "+base).newTask(); - DynamicTasks.queueIfPossible(baseTask).orSubmitAsync(entity); - resolvedBase = Strings.getFragmentBetween(baseTask.block().getStdout(), "BASE_DIR_RESULT:", ":BASE_DIR_RESULT"); - } - if (resolvedBase==null) { - if (!Os.isAbsolutish(base)) base = "~/"+base; - resolvedBase = Os.tidyPath(base); - log.warn("Could not resolve on-box directory for "+entity+" at "+machine+"; using "+resolvedBase+", though this may not be accurate at the target (and may fail shortly)"); - } - entity.setConfig(BrooklynConfigKeys.ONBOX_BASE_DIR, resolvedBase); - entity.setConfig(ON_BOX_BASE_DIR_RESOLVED, true); - return resolvedBase; - } - - protected void checkLocationParametersCompatible(MachineLocation oldLoc, MachineLocation newLoc, String paramSummary, - Object oldParam, Object newParam) { - if (oldParam==null || newParam==null || !oldParam.equals(newParam)) - throw new IllegalStateException("Cannot start "+entity()+" in "+newLoc+" as it has already been started with incompatible location "+oldLoc+" " + - "("+paramSummary+" not compatible: "+oldParam+" / "+newParam+"); "+newLoc+" may require manual removal."); - } - - /** - * Default pre-start hooks. - * <p> - * Can be extended by subclasses if needed. - */ - protected void preStartCustom(MachineLocation machine) { - ConfigToAttributes.apply(entity()); - - // Opportunity to block startup until other dependent components are available - Object val = entity().getConfig(SoftwareProcess.START_LATCH); - if (val != null) log.debug("{} finished waiting for start-latch; continuing...", entity(), val); - } - - protected Map<String, Object> obtainProvisioningFlags(final MachineProvisioningLocation<?> location) { - if (entity() instanceof ProvidesProvisioningFlags) { - return ((ProvidesProvisioningFlags)entity()).obtainProvisioningFlags(location).getAllConfig(); - } - return MutableMap.<String, Object>of(); - } - - protected abstract String startProcessesAtMachine(final Supplier<MachineLocation> machineS); - - protected void postStartAtMachineAsync() { - DynamicTasks.queue("post-start", new PostStartTask()); - } - - private class PostStartTask implements Runnable { - public void run() { - postStartCustom(); - } - } - - /** - * Default post-start hooks. - * <p> - * Can be extended by subclasses, and typically will wait for confirmation of start. - * The service not set to running until after this. Also invoked following a restart. - */ - protected void postStartCustom() { - // nothing by default - } - - /** - * whether when 'auto' mode is specified, the machine should be stopped when the restart effector is called - * <p> - * with {@link MachineLifecycleEffectorTasks}, a machine will always get created on restart if there wasn't one already - * (unlike certain subclasses which might attempt a shortcut process-level restart) - * so there is no reason for default behaviour of restart to throw away a provisioned machine, - * hence default impl returns <code>false</code>. - * <p> - * if it is possible to tell that a machine is unhealthy, or if {@link #restart(ConfigBag)} is overridden, - * then it might be appropriate to return <code>true</code> here. - */ - protected boolean getDefaultRestartStopsMachine() { - return false; - } - - /** - * Default restart implementation for an entity. - * <p> - * Stops processes if possible, then starts the entity again. - */ - public void restart(ConfigBag parameters) { - ServiceStateLogic.setExpectedState(entity(), Lifecycle.STOPPING); - - RestartMachineMode isRestartMachine = parameters.get(RestartSoftwareParameters.RESTART_MACHINE_TYPED); - if (isRestartMachine==null) - isRestartMachine=RestartMachineMode.AUTO; - if (isRestartMachine==RestartMachineMode.AUTO) - isRestartMachine = getDefaultRestartStopsMachine() ? RestartMachineMode.TRUE : RestartMachineMode.FALSE; - - // Calling preStopCustom without a corresponding postStopCustom invocation - // doesn't look right so use a separate callback pair; Also depending on the arguments - // stop() could be called which will call the {pre,post}StopCustom on its own. - DynamicTasks.queue("pre-restart", new PreRestartTask()); - - if (isRestartMachine==RestartMachineMode.FALSE) { - DynamicTasks.queue("stopping (process)", new StopProcessesAtMachineTask()); - } else { - DynamicTasks.queue("stopping (machine)", new StopMachineTask()); - } - - DynamicTasks.queue("starting", new StartInLocationsTask()); - restartChildren(parameters); - DynamicTasks.queue("post-restart", new PostRestartTask()); - - DynamicTasks.waitForLast(); - ServiceStateLogic.setExpectedState(entity(), Lifecycle.RUNNING); - } - - private class PreRestartTask implements Runnable { - @Override - public void run() { - preRestartCustom(); - } - } - private class PostRestartTask implements Runnable { - @Override - public void run() { - postRestartCustom(); - } - } - private class StartInLocationsTask implements Runnable { - @Override - public void run() { - // startInLocations will look up the location, and provision a machine if necessary - // (if it remembered the provisioning location) - ServiceStateLogic.setExpectedState(entity(), Lifecycle.STARTING); - startInLocations(null); - } - } - - protected void restartChildren(ConfigBag parameters) { - // TODO should we consult ChildStartableMode? - - Boolean isRestartChildren = parameters.get(RestartSoftwareParameters.RESTART_CHILDREN); - if (isRestartChildren==null || !isRestartChildren) { - return; - } - - if (isRestartChildren) { - DynamicTasks.queue(StartableMethods.restartingChildren(entity(), parameters)); - return; - } - - throw new IllegalArgumentException("Invalid value '"+isRestartChildren+"' for "+RestartSoftwareParameters.RESTART_CHILDREN.getName()); - } - - /** - * Default stop implementation for an entity. - * <p> - * Aborts if already stopped, otherwise sets state {@link Lifecycle#STOPPING} then - * invokes {@link #preStopCustom()}, {@link #stopProcessesAtMachine()}, then finally - * {@link #stopAnyProvisionedMachines()} and sets state {@link Lifecycle#STOPPED}. - * If no errors were encountered call {@link #postStopCustom()} at the end. - */ - public void stop(ConfigBag parameters) { - doStop(parameters, new StopAnyProvisionedMachinesTask()); - } - - /** - * As {@link #stop} but calling {@link #suspendAnyProvisionedMachines} rather than - * {@link #stopAnyProvisionedMachines}. - */ - public void suspend(ConfigBag parameters) { - doStop(parameters, new SuspendAnyProvisionedMachinesTask()); - } - - protected void doStop(ConfigBag parameters, Callable<StopMachineDetails<Integer>> stopTask) { - preStopConfirmCustom(); - - log.info("Stopping {} in {}", entity(), entity().getLocations()); - - StopMode stopMachineMode = getStopMachineMode(parameters); - StopMode stopProcessMode = parameters.get(StopSoftwareParameters.STOP_PROCESS_MODE); - - DynamicTasks.queue("pre-stop", new PreStopCustomTask()); - - Maybe<MachineLocation> machine = Machines.findUniqueMachineLocation(entity().getLocations()); - Task<String> stoppingProcess = null; - if (canStop(stopProcessMode, entity())) { - stoppingProcess = DynamicTasks.queue("stopping (process)", new StopProcessesAtMachineTask()); - } - - Task<StopMachineDetails<Integer>> stoppingMachine = null; - if (canStop(stopMachineMode, machine.isAbsent())) { - // Release this machine (even if error trying to stop process - we rethrow that after) - stoppingMachine = DynamicTasks.queue("stopping (machine)", stopTask); - - DynamicTasks.drain(entity().getConfig(STOP_PROCESS_TIMEOUT), false); - - // shutdown the machine if stopping process fails or takes too long - synchronized (stoppingMachine) { - // task also used as mutex by DST when it submits it; ensure it only submits once! - if (!stoppingMachine.isSubmitted()) { - // force the stoppingMachine task to run by submitting it here - StringBuilder msg = new StringBuilder("Submitting machine stop early in background for ").append(entity()); - if (stoppingProcess == null) { - msg.append(". Process stop skipped, pre-stop not finished?"); - } else { - msg.append(" because process stop has ").append( - (stoppingProcess.isDone() ? "finished abnormally" : "not finished")); - } - log.warn(msg.toString()); - Entities.submit(entity(), stoppingMachine); - } - } - } - - try { - // This maintains previous behaviour of silently squashing any errors on the stoppingProcess task if the - // stoppingMachine exits with a nonzero value - boolean checkStopProcesses = (stoppingProcess != null && (stoppingMachine == null || stoppingMachine.get().value == 0)); - - if (checkStopProcesses) { - // TODO we should test for destruction above, not merely successful "stop", as things like localhost and ssh won't be destroyed - DynamicTasks.waitForLast(); - if (machine.isPresent()) { - // throw early errors *only if* there is a machine and we have not destroyed it - stoppingProcess.get(); - } - } - } catch (Throwable e) { - ServiceStateLogic.setExpectedState(entity(), Lifecycle.ON_FIRE); - Exceptions.propagate(e); - } - entity().setAttribute(SoftwareProcess.SERVICE_UP, false); - ServiceStateLogic.setExpectedState(entity(), Lifecycle.STOPPED); - - DynamicTasks.queue("post-stop", new PostStopCustomTask()); - - if (log.isDebugEnabled()) log.debug("Stopped software process entity "+entity()); - } - - private class StopAnyProvisionedMachinesTask implements Callable<StopMachineDetails<Integer>> { - public StopMachineDetails<Integer> call() { - return stopAnyProvisionedMachines(); - } - } - - private class SuspendAnyProvisionedMachinesTask implements Callable<StopMachineDetails<Integer>> { - public StopMachineDetails<Integer> call() { - return suspendAnyProvisionedMachines(); - } - } - - private class StopProcessesAtMachineTask implements Callable<String> { - public String call() { - DynamicTasks.markInessential(); - stopProcessesAtMachine(); - DynamicTasks.waitForLast(); - return "Stop processes completed with no errors."; - } - } - - private class StopMachineTask implements Callable<String> { - public String call() { - DynamicTasks.markInessential(); - stop(ConfigBag.newInstance().configure(StopSoftwareParameters.STOP_MACHINE_MODE, StopMode.IF_NOT_STOPPED)); - DynamicTasks.waitForLast(); - return "Stop of machine completed with no errors."; - } - } - - private class PreStopCustomTask implements Callable<String> { - public String call() { - if (entity().getAttribute(SoftwareProcess.SERVICE_STATE_ACTUAL) == Lifecycle.STOPPED) { - log.debug("Skipping stop of entity " + entity() + " when already stopped"); - return "Already stopped"; - } - ServiceStateLogic.setExpectedState(entity(), Lifecycle.STOPPING); - entity().setAttribute(SoftwareProcess.SERVICE_UP, false); - preStopCustom(); - return null; - } - } - - private class PostStopCustomTask implements Callable<Void> { - public Void call() { - postStopCustom(); - return null; - } - } - - public static StopMode getStopMachineMode(ConfigBag parameters) { - final StopMode stopMachineMode = parameters.get(StopSoftwareParameters.STOP_MACHINE_MODE); - return stopMachineMode; - } - - public static boolean canStop(StopMode stopMode, Entity entity) { - boolean isEntityStopped = entity.getAttribute(SoftwareProcess.SERVICE_STATE_ACTUAL)==Lifecycle.STOPPED; - return canStop(stopMode, isEntityStopped); - } - - protected static boolean canStop(StopMode stopMode, boolean isStopped) { - return stopMode == StopMode.ALWAYS || - stopMode == StopMode.IF_NOT_STOPPED && !isStopped; - } - - /** - * Override to check whether stop can be executed. - * Throw if stop should be aborted. - */ - protected void preStopConfirmCustom() { - // nothing needed here - } - - protected void preStopCustom() { - // nothing needed here - } - - protected void postStopCustom() { - // nothing needed here - } - - protected void preRestartCustom() { - // nothing needed here - } - - protected void postRestartCustom() { - // nothing needed here - } - - public static class StopMachineDetails<T> implements Serializable { - private static final long serialVersionUID = 3256747214315895431L; - final String message; - final T value; - protected StopMachineDetails(String message, T value) { - this.message = message; - this.value = value; - } - @Override - public String toString() { - return message; - } - } - - /** - * Return string message of result. - * <p> - * Can run synchronously or not, caller will submit/queue as needed, and will block on any submitted tasks. - */ - protected abstract String stopProcessesAtMachine(); - - /** - * Stop and release the {@link MachineLocation} the entity is provisioned at. - * <p> - * Can run synchronously or not, caller will submit/queue as needed, and will block on any submitted tasks. - */ - protected StopMachineDetails<Integer> stopAnyProvisionedMachines() { - @SuppressWarnings("unchecked") - MachineProvisioningLocation<MachineLocation> provisioner = entity().getAttribute(SoftwareProcess.PROVISIONING_LOCATION); - - if (Iterables.isEmpty(entity().getLocations())) { - log.debug("No machine decommissioning necessary for "+entity()+" - no locations"); - return new StopMachineDetails<Integer>("No machine decommissioning necessary - no locations", 0); - } - - // Only release this machine if we ourselves provisioned it (e.g. it might be running other services) - if (provisioner==null) { - log.debug("No machine decommissioning necessary for "+entity()+" - did not provision"); - return new StopMachineDetails<Integer>("No machine decommissioning necessary - did not provision", 0); - } - - Location machine = getLocation(null); - if (!(machine instanceof MachineLocation)) { - log.debug("No decommissioning necessary for "+entity()+" - not a machine location ("+machine+")"); - return new StopMachineDetails<Integer>("No machine decommissioning necessary - not a machine ("+machine+")", 0); - } - - clearEntityLocationAttributes(machine); - provisioner.release((MachineLocation)machine); - - return new StopMachineDetails<Integer>("Decommissioned "+machine, 1); - } - - /** - * Suspend the {@link MachineLocation} the entity is provisioned at. - * <p> - * Expects the entity's {@link SoftwareProcess#PROVISIONING_LOCATION provisioner} to be capable of - * {@link SuspendsMachines suspending machines}. - * - * @throws java.lang.UnsupportedOperationException if the entity's provisioner cannot suspend machines. - * @see MachineManagementMixins.SuspendsMachines - */ - protected StopMachineDetails<Integer> suspendAnyProvisionedMachines() { - @SuppressWarnings("unchecked") - MachineProvisioningLocation<MachineLocation> provisioner = entity().getAttribute(SoftwareProcess.PROVISIONING_LOCATION); - - if (Iterables.isEmpty(entity().getLocations())) { - log.debug("No machine decommissioning necessary for " + entity() + " - no locations"); - return new StopMachineDetails<>("No machine suspend necessary - no locations", 0); - } - - // Only release this machine if we ourselves provisioned it (e.g. it might be running other services) - if (provisioner == null) { - log.debug("No machine decommissioning necessary for " + entity() + " - did not provision"); - return new StopMachineDetails<>("No machine suspend necessary - did not provision", 0); - } - - Location machine = getLocation(null); - if (!(machine instanceof MachineLocation)) { - log.debug("No decommissioning necessary for " + entity() + " - not a machine location (" + machine + ")"); - return new StopMachineDetails<>("No machine suspend necessary - not a machine (" + machine + ")", 0); - } - - if (!(provisioner instanceof SuspendsMachines)) { - log.debug("Location provisioner ({}) cannot suspend machines", provisioner); - throw new UnsupportedOperationException("Location provisioner cannot suspend machines: " + provisioner); - } - - clearEntityLocationAttributes(machine); - SuspendsMachines.class.cast(provisioner).suspendMachine(MachineLocation.class.cast(machine)); - - return new StopMachineDetails<>("Suspended " + machine, 1); - } - - /** - * Nulls the attached entity's hostname, address, subnet hostname and subnet address sensors - * and removes the given machine from its locations. - */ - protected void clearEntityLocationAttributes(Location machine) { - entity().removeLocations(ImmutableList.of(machine)); - entity().setAttribute(Attributes.HOSTNAME, null); - entity().setAttribute(Attributes.ADDRESS, null); - entity().setAttribute(Attributes.SUBNET_HOSTNAME, null); - entity().setAttribute(Attributes.SUBNET_ADDRESS, null); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/software/ProvidesProvisioningFlags.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/software/ProvidesProvisioningFlags.java b/software/base/src/main/java/brooklyn/entity/software/ProvidesProvisioningFlags.java deleted file mode 100644 index 3e47919..0000000 --- a/software/base/src/main/java/brooklyn/entity/software/ProvidesProvisioningFlags.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.software; - -import org.apache.brooklyn.api.location.MachineProvisioningLocation; -import org.apache.brooklyn.util.core.config.ConfigBag; - -import com.google.common.annotations.Beta; - -/** Marker interface for an entity which supplies custom machine provisioning flags; - * used e.g. in {@link MachineLifecycleEffectorTasks}. - * @since 0.6.0 */ -@Beta -public interface ProvidesProvisioningFlags { - - public ConfigBag obtainProvisioningFlags(MachineProvisioningLocation<?> location); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/software/SshEffectorTasks.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/software/SshEffectorTasks.java b/software/base/src/main/java/brooklyn/entity/software/SshEffectorTasks.java deleted file mode 100644 index bf1cf52..0000000 --- a/software/base/src/main/java/brooklyn/entity/software/SshEffectorTasks.java +++ /dev/null @@ -1,336 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.software; - -import java.util.List; -import java.util.Map; - -import javax.annotation.Nullable; - -import org.apache.brooklyn.api.effector.Effector; -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.mgmt.Task; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.config.StringConfigMap; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.config.ConfigUtils; -import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; -import org.apache.brooklyn.effector.core.EffectorBody; -import org.apache.brooklyn.effector.core.EffectorTasks; -import org.apache.brooklyn.effector.core.EffectorTasks.EffectorTaskFactory; -import org.apache.brooklyn.entity.core.EntityInternal; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.SoftwareProcess; - -import org.apache.brooklyn.location.basic.LocationInternal; -import org.apache.brooklyn.location.basic.SshMachineLocation; -import org.apache.brooklyn.util.core.config.ConfigBag; -import org.apache.brooklyn.util.core.internal.ssh.SshTool; -import org.apache.brooklyn.util.core.task.Tasks; -import org.apache.brooklyn.util.core.task.ssh.SshFetchTaskFactory; -import org.apache.brooklyn.util.core.task.ssh.SshFetchTaskWrapper; -import org.apache.brooklyn.util.core.task.ssh.SshPutTaskFactory; -import org.apache.brooklyn.util.core.task.ssh.SshPutTaskWrapper; -import org.apache.brooklyn.util.core.task.ssh.SshTasks; -import org.apache.brooklyn.util.core.task.ssh.internal.AbstractSshExecTaskFactory; -import org.apache.brooklyn.util.core.task.ssh.internal.PlainSshExecTaskFactory; -import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory; -import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; -import org.apache.brooklyn.util.ssh.BashCommands; - -import com.google.common.annotations.Beta; -import com.google.common.base.Function; -import com.google.common.collect.Maps; - -/** - * Conveniences for generating {@link Task} instances to perform SSH activities. - * <p> - * If the {@link SshMachineLocation machine} is not specified directly it - * will be inferred from the {@link Entity} context of either the {@link Effector} - * or the current {@link Task}. - * - * @see SshTasks - * @since 0.6.0 - */ -@Beta -public class SshEffectorTasks { - - private static final Logger log = LoggerFactory.getLogger(SshEffectorTasks.class); - - public static final ConfigKey<Boolean> IGNORE_ENTITY_SSH_FLAGS = ConfigKeys.newBooleanConfigKey("ignoreEntitySshFlags", - "Whether to ignore any ssh flags (behaviour constraints) set on the entity or location " + - "where this is running, using only flags explicitly specified", false); - - /** like {@link EffectorBody} but providing conveniences when in a {@link SoftwareProcess} - * (or other entity with a single machine location) */ - public abstract static class SshEffectorBody<T> extends EffectorBody<T> { - - /** convenience for accessing the machine */ - public SshMachineLocation machine() { - return EffectorTasks.getSshMachine(entity()); - } - - /** convenience for generating an {@link PlainSshExecTaskFactory} which can be further customised if desired, and then (it must be explicitly) queued */ - public ProcessTaskFactory<Integer> ssh(String ...commands) { - return new SshEffectorTaskFactory<Integer>(commands).machine(machine()); - } - } - - /** variant of {@link PlainSshExecTaskFactory} which fulfills the {@link EffectorTaskFactory} signature so can be used directly as an impl for an effector, - * also injects the machine automatically; can also be used outwith effector contexts, and machine is still injected if it is - * run from inside a task at an entity with a single SshMachineLocation */ - public static class SshEffectorTaskFactory<RET> extends AbstractSshExecTaskFactory<SshEffectorTaskFactory<RET>,RET> implements EffectorTaskFactory<RET> { - - public SshEffectorTaskFactory(String ...commands) { - super(commands); - } - public SshEffectorTaskFactory(SshMachineLocation machine, String ...commands) { - super(machine, commands); - } - @Override - public ProcessTaskWrapper<RET> newTask(Entity entity, Effector<RET> effector, ConfigBag parameters) { - markDirty(); - if (summary==null) summary(effector.getName()+" (ssh)"); - machine(EffectorTasks.getSshMachine(entity)); - return newTask(); - } - @Override - public synchronized ProcessTaskWrapper<RET> newTask() { - Entity entity = BrooklynTaskTags.getTargetOrContextEntity(Tasks.current()); - if (machine==null) { - if (log.isDebugEnabled()) - log.debug("Using an ssh task not in an effector without any machine; will attempt to infer the machine: "+this); - if (entity!=null) - machine(EffectorTasks.getSshMachine(entity)); - } - applySshFlags(getConfig(), entity, getMachine()); - return super.newTask(); - } - - @Override - public <T2> SshEffectorTaskFactory<T2> returning(ScriptReturnType type) { - return (SshEffectorTaskFactory<T2>) super.<T2>returning(type); - } - - @Override - public SshEffectorTaskFactory<Boolean> returningIsExitCodeZero() { - return (SshEffectorTaskFactory<Boolean>) super.returningIsExitCodeZero(); - } - - public SshEffectorTaskFactory<String> requiringZeroAndReturningStdout() { - return (SshEffectorTaskFactory<String>) super.requiringZeroAndReturningStdout(); - } - - public <RET2> SshEffectorTaskFactory<RET2> returning(Function<ProcessTaskWrapper<?>, RET2> resultTransformation) { - return (SshEffectorTaskFactory<RET2>) super.returning(resultTransformation); - } - } - - public static class SshPutEffectorTaskFactory extends SshPutTaskFactory implements EffectorTaskFactory<Void> { - public SshPutEffectorTaskFactory(String remoteFile) { - super(remoteFile); - } - public SshPutEffectorTaskFactory(SshMachineLocation machine, String remoteFile) { - super(machine, remoteFile); - } - @Override - public SshPutTaskWrapper newTask(Entity entity, Effector<Void> effector, ConfigBag parameters) { - machine(EffectorTasks.getSshMachine(entity)); - applySshFlags(getConfig(), entity, getMachine()); - return super.newTask(); - } - @Override - public SshPutTaskWrapper newTask() { - Entity entity = BrooklynTaskTags.getTargetOrContextEntity(Tasks.current()); - if (machine==null) { - if (log.isDebugEnabled()) - log.debug("Using an ssh put task not in an effector without any machine; will attempt to infer the machine: "+this); - if (entity!=null) { - machine(EffectorTasks.getSshMachine(entity)); - } - - } - applySshFlags(getConfig(), entity, getMachine()); - return super.newTask(); - } - } - - public static class SshFetchEffectorTaskFactory extends SshFetchTaskFactory implements EffectorTaskFactory<String> { - public SshFetchEffectorTaskFactory(String remoteFile) { - super(remoteFile); - } - public SshFetchEffectorTaskFactory(SshMachineLocation machine, String remoteFile) { - super(machine, remoteFile); - } - @Override - public SshFetchTaskWrapper newTask(Entity entity, Effector<String> effector, ConfigBag parameters) { - machine(EffectorTasks.getSshMachine(entity)); - applySshFlags(getConfig(), entity, getMachine()); - return super.newTask(); - } - @Override - public SshFetchTaskWrapper newTask() { - Entity entity = BrooklynTaskTags.getTargetOrContextEntity(Tasks.current()); - if (machine==null) { - if (log.isDebugEnabled()) - log.debug("Using an ssh fetch task not in an effector without any machine; will attempt to infer the machine: "+this); - if (entity!=null) - machine(EffectorTasks.getSshMachine(entity)); - } - applySshFlags(getConfig(), entity, getMachine()); - return super.newTask(); - } - } - - public static SshEffectorTaskFactory<Integer> ssh(String ...commands) { - return new SshEffectorTaskFactory<Integer>(commands); - } - - public static SshEffectorTaskFactory<Integer> ssh(List<String> commands) { - return ssh(commands.toArray(new String[commands.size()])); - } - - public static SshPutTaskFactory put(String remoteFile) { - return new SshPutEffectorTaskFactory(remoteFile); - } - - public static SshFetchEffectorTaskFactory fetch(String remoteFile) { - return new SshFetchEffectorTaskFactory(remoteFile); - } - - /** task which returns 0 if pid is running */ - public static SshEffectorTaskFactory<Integer> codePidRunning(Integer pid) { - return ssh("ps -p "+pid).summary("PID "+pid+" is-running check (exit code)").allowingNonZeroExitCode(); - } - - /** task which fails if the given PID is not running */ - public static SshEffectorTaskFactory<?> requirePidRunning(Integer pid) { - return codePidRunning(pid).summary("PID "+pid+" is-running check (required)").requiringExitCodeZero("Process with PID "+pid+" is required to be running"); - } - - /** as {@link #codePidRunning(Integer)} but returning boolean */ - public static SshEffectorTaskFactory<Boolean> isPidRunning(Integer pid) { - return codePidRunning(pid).summary("PID "+pid+" is-running check (boolean)").returning(new Function<ProcessTaskWrapper<?>, Boolean>() { - public Boolean apply(@Nullable ProcessTaskWrapper<?> input) { return Integer.valueOf(0).equals(input.getExitCode()); } - }); - } - - - /** task which returns 0 if pid in the given file is running; - * method accepts wildcards so long as they match a single file on the remote end - * <p> - * returns 1 if no matching file, - * 1 if matching file but no matching process, - * and 2 if 2+ matching files */ - public static SshEffectorTaskFactory<Integer> codePidFromFileRunning(final String pidFile) { - return ssh(BashCommands.chain( - // this fails, but isn't an error - BashCommands.requireTest("-f "+pidFile, "The PID file "+pidFile+" does not exist."), - // this fails and logs an error picked up later - BashCommands.requireTest("`ls "+pidFile+" | wc -w` -eq 1", "ERROR: there are multiple matching PID files"), - // this fails and logs an error picked up later - BashCommands.require("cat "+pidFile, "ERROR: the PID file "+pidFile+" cannot be read (permissions?)."), - // finally check the process - "ps -p `cat "+pidFile+"`")).summary("PID file "+pidFile+" is-running check (exit code)") - .allowingNonZeroExitCode() - .addCompletionListener(new Function<ProcessTaskWrapper<?>,Void>() { - public Void apply(ProcessTaskWrapper<?> input) { - if (input.getStderr().contains("ERROR:")) - throw new IllegalStateException("Invalid or inaccessible PID filespec: "+pidFile); - return null; - } - }); - } - - /** task which fails if the pid in the given file is not running (or if there is no such PID file); - * method accepts wildcards so long as they match a single file on the remote end (fails if 0 or 2+ matching files) */ - public static SshEffectorTaskFactory<?> requirePidFromFileRunning(String pidFile) { - return codePidFromFileRunning(pidFile) - .summary("PID file "+pidFile+" is-running check (required)") - .requiringExitCodeZero("Process with PID from file "+pidFile+" is required to be running"); - } - - /** as {@link #codePidFromFileRunning(String)} but returning boolean */ - public static SshEffectorTaskFactory<Boolean> isPidFromFileRunning(String pidFile) { - return codePidFromFileRunning(pidFile).summary("PID file "+pidFile+" is-running check (boolean)"). - returning(new Function<ProcessTaskWrapper<?>, Boolean>() { - public Boolean apply(@Nullable ProcessTaskWrapper<?> input) { return ((Integer)0).equals(input.getExitCode()); } - }); - } - - /** extracts the values for the main brooklyn.ssh.config.* config keys (i.e. those declared in ConfigKeys) - * as declared on the entity, and inserts them in a map using the unprefixed state, for ssh. - * <p> - * currently this is computed for each call, which may be wasteful, but it is reliable in the face of config changes. - * we could cache the Map. note that we do _not_ cache (or even own) the SshTool; - * the SshTool is created or re-used by the SshMachineLocation making use of these properties */ - @Beta - public static Map<String, Object> getSshFlags(Entity entity, Location optionalLocation) { - ConfigBag allConfig = ConfigBag.newInstance(); - - StringConfigMap globalConfig = ((EntityInternal)entity).getManagementContext().getConfig(); - allConfig.putAll(globalConfig.getAllConfig()); - - if (optionalLocation!=null) - allConfig.putAll(((LocationInternal)optionalLocation).config().getBag()); - - allConfig.putAll(((EntityInternal)entity).getAllConfig()); - - Map<String, Object> result = Maps.newLinkedHashMap(); - for (String keyS : allConfig.getAllConfig().keySet()) { - if (keyS.startsWith(SshTool.BROOKLYN_CONFIG_KEY_PREFIX)) { - ConfigKey<?> key = ConfigKeys.newConfigKey(Object.class, keyS); - - Object val = allConfig.getStringKey(keyS); - - /* - * NOV 2013 changing this to rely on config above being inserted in the right order, - * so entity config will be preferred over location, and location over global. - * If that is consistent then remove the lines below. - * (We can also accept null entity and so combine with SshTasks.getSshFlags.) - */ - -// // have to use raw config to test whether the config is set -// Object val = ((EntityInternal)entity).getConfigMap().getRawConfig(key); -// if (val!=null) { -// val = entity.getConfig(key); -// } else { -// val = globalConfig.getRawConfig(key); -// if (val!=null) val = globalConfig.getConfig(key); -// } -// if (val!=null) { - result.put(ConfigUtils.unprefixedKey(SshTool.BROOKLYN_CONFIG_KEY_PREFIX, key).getName(), val); -// } - } - } - return result; - } - - private static void applySshFlags(ConfigBag config, Entity entity, Location machine) { - if (entity!=null) { - if (!config.get(IGNORE_ENTITY_SSH_FLAGS)) { - config.putIfAbsent(getSshFlags(entity, machine)); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/software/StaticSensor.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/software/StaticSensor.java b/software/base/src/main/java/brooklyn/entity/software/StaticSensor.java deleted file mode 100644 index 05a92ed..0000000 --- a/software/base/src/main/java/brooklyn/entity/software/StaticSensor.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.software; - -import org.apache.brooklyn.api.internal.EntityLocal; -import org.apache.brooklyn.api.mgmt.Task; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.effector.core.AddSensor; -import org.apache.brooklyn.sensor.enricher.Propagator; -import org.apache.brooklyn.util.core.config.ConfigBag; -import org.apache.brooklyn.util.core.task.Tasks; -import org.apache.brooklyn.util.core.task.ValueResolver; -import org.apache.brooklyn.util.guava.Maybe; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Supplier; - -/** - * Provides an initializer/feed which simply sets a given value. - * <p> - * {@link Task}/{@link Supplier} values are resolved when written, - * unlike config values which are resolved on each read. - * <p> - * This supports a {@link StaticSensor#SENSOR_PERIOD} - * which can be useful if the supplied value is such a function. - * However when the source is another sensor, - * consider using {@link Propagator} which listens for changes instead. */ -public class StaticSensor<T> extends AddSensor<T> { - - private static final Logger log = LoggerFactory.getLogger(StaticSensor.class); - - public static final ConfigKey<Object> STATIC_VALUE = ConfigKeys.newConfigKey(Object.class, "static.value"); - - private final Object value; - - public StaticSensor(ConfigBag params) { - super(params); - value = params.get(STATIC_VALUE); - } - - @SuppressWarnings("unchecked") - @Override - public void apply(EntityLocal entity) { - super.apply(entity); - - Maybe<T> v = Tasks.resolving(value).as((Class<T>)sensor.getType()).timeout(ValueResolver.PRETTY_QUICK_WAIT).getMaybe(); - if (v.isPresent()) { - log.debug(this+" setting sensor "+sensor+" to "+v.get()); - entity.setAttribute(sensor, v.get()); - } else { - log.debug(this+" not setting sensor "+sensor+"; cannot resolve "+value); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/software/http/HttpRequestSensor.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/software/http/HttpRequestSensor.java b/software/base/src/main/java/brooklyn/entity/software/http/HttpRequestSensor.java deleted file mode 100644 index b2c68a6..0000000 --- a/software/base/src/main/java/brooklyn/entity/software/http/HttpRequestSensor.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.software.http; - -import java.net.URI; - -import net.minidev.json.JSONObject; - -import org.apache.brooklyn.api.internal.EntityLocal; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.effector.core.AddSensor; -import org.apache.brooklyn.sensor.feed.http.HttpFeed; -import org.apache.brooklyn.sensor.feed.http.HttpPollConfig; -import org.apache.brooklyn.sensor.feed.http.HttpValueFunctions; -import org.apache.brooklyn.util.core.config.ConfigBag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.software.java.JmxAttributeSensor; -import brooklyn.entity.software.ssh.SshCommandSensor; - -import com.google.common.annotations.Beta; -import com.google.common.base.Functions; -import com.google.common.base.Supplier; - -/** - * Configurable {@link org.apache.brooklyn.api.entity.EntityInitializer} which adds an HTTP sensor feed to retrieve the - * {@link JSONObject} from a JSON response in order to populate the sensor with the data at the {@code jsonPath}. - * - * @see SshCommandSensor - * @see JmxAttributeSensor - */ -@Beta -public final class HttpRequestSensor<T> extends AddSensor<T> { - - private static final Logger LOG = LoggerFactory.getLogger(HttpRequestSensor.class); - - public static final ConfigKey<String> SENSOR_URI = ConfigKeys.newStringConfigKey("uri", "HTTP URI to poll for JSON"); - public static final ConfigKey<String> JSON_PATH = ConfigKeys.newStringConfigKey("jsonPath", "JSON path to select in HTTP response; default $", "$"); - public static final ConfigKey<String> USERNAME = ConfigKeys.newStringConfigKey("username", "Username for HTTP request, if required"); - public static final ConfigKey<String> PASSWORD = ConfigKeys.newStringConfigKey("password", "Password for HTTP request, if required"); - - protected final Supplier<URI> uri; - protected final String jsonPath; - protected final String username; - protected final String password; - - public HttpRequestSensor(final ConfigBag params) { - super(params); - - uri = new Supplier<URI>() { - @Override - public URI get() { - return URI.create(params.get(SENSOR_URI)); - } - }; - jsonPath = params.get(JSON_PATH); - username = params.get(USERNAME); - password = params.get(PASSWORD); - } - - @Override - public void apply(final EntityLocal entity) { - super.apply(entity); - - if (LOG.isDebugEnabled()) { - LOG.debug("Adding HTTP JSON sensor {} to {}", name, entity); - } - - HttpPollConfig<T> pollConfig = new HttpPollConfig<T>(sensor) - .checkSuccess(HttpValueFunctions.responseCodeEquals(200)) - .onFailureOrException(Functions.constant((T) null)) - .onSuccess(HttpValueFunctions.<T>jsonContentsFromPath(jsonPath)) - .period(period); - - HttpFeed.builder().entity(entity) - .baseUri(uri) - .credentialsIfNotNull(username, password) - .poll(pollConfig) - .build(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/software/java/JmxAttributeSensor.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/software/java/JmxAttributeSensor.java b/software/base/src/main/java/brooklyn/entity/software/java/JmxAttributeSensor.java deleted file mode 100644 index d66b67b..0000000 --- a/software/base/src/main/java/brooklyn/entity/software/java/JmxAttributeSensor.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.software.java; - -import java.util.concurrent.Callable; - -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.apache.brooklyn.api.internal.EntityLocal; -import org.apache.brooklyn.api.mgmt.Task; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.effector.core.AddSensor; -import org.apache.brooklyn.sensor.core.DependentConfiguration; -import org.apache.brooklyn.util.core.config.ConfigBag; -import org.apache.brooklyn.util.core.task.DynamicTasks; -import org.apache.brooklyn.util.core.task.Tasks; -import org.apache.brooklyn.util.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.java.UsesJmx; -import brooklyn.entity.software.http.HttpRequestSensor; -import brooklyn.entity.software.ssh.SshCommandSensor; -import brooklyn.event.feed.jmx.JmxAttributePollConfig; -import brooklyn.event.feed.jmx.JmxFeed; -import brooklyn.event.feed.jmx.JmxHelper; - -import com.google.common.annotations.Beta; -import com.google.common.base.Functions; -import com.google.common.base.Preconditions; - -/** - * Configurable {@link org.apache.brooklyn.api.entity.EntityInitializer} which adds a JMX sensor feed to retrieve an - * <code>attribute</code> from a JMX <code>objectName</code>. - * - * @see SshCommandSensor - * @see HttpRequestSensor - */ -@Beta -public final class JmxAttributeSensor<T> extends AddSensor<T> { - - private static final Logger LOG = LoggerFactory.getLogger(JmxAttributeSensor.class); - - public static final ConfigKey<String> OBJECT_NAME = ConfigKeys.newStringConfigKey("objectName", "JMX object name for sensor lookup"); - public static final ConfigKey<String> ATTRIBUTE = ConfigKeys.newStringConfigKey("attribute", "JMX attribute to poll in object"); - public static final ConfigKey<Object> DEFAULT_VALUE = ConfigKeys.newConfigKey(Object.class, "defaultValue", "Default value for sensor; normally null"); - - protected final String objectName; - protected final String attribute; - protected final Object defaultValue; - - public JmxAttributeSensor(final ConfigBag params) { - super(params); - - objectName = Preconditions.checkNotNull(params.get(OBJECT_NAME), "objectName"); - attribute = Preconditions.checkNotNull(params.get(ATTRIBUTE), "attribute"); - defaultValue = params.get(DEFAULT_VALUE); - - try { - ObjectName.getInstance(objectName); - } catch (MalformedObjectNameException mone) { - throw new IllegalArgumentException("Malformed JMX object name: " + objectName, mone); - } - } - - @Override - public void apply(final EntityLocal entity) { - super.apply(entity); - - if (entity instanceof UsesJmx) { - if (LOG.isDebugEnabled()) { - LOG.debug("Submitting task to add JMX sensor {} to {}", name, entity); - } - - Task<Integer> jmxPortTask = DependentConfiguration.attributeWhenReady(entity, UsesJmx.JMX_PORT); - Task<JmxFeed> jmxFeedTask = Tasks.<JmxFeed>builder() - .description("Add JMX feed") - .body(new Callable<JmxFeed>() { - @Override - public JmxFeed call() throws Exception { - JmxHelper helper = new JmxHelper(entity); - Duration period = entity.getConfig(SENSOR_PERIOD); - - JmxFeed feed = JmxFeed.builder() - .entity(entity) - .period(period) - .helper(helper) - .pollAttribute(new JmxAttributePollConfig<T>(sensor) - .objectName(objectName) - .attributeName(attribute) - .onFailureOrException(Functions.<T>constant((T) defaultValue))) - .build(); - return feed; - } - }) - .build(); - DynamicTasks.submit(Tasks.sequential("Add JMX Sensor " + sensor.getName(), jmxPortTask, jmxFeedTask), entity); - } else { - throw new IllegalStateException(String.format("Entity %s does not support JMX", entity)); - } - - // TODO add entity shutdown hook to stop JmxFeed - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/software/ssh/SshCommandEffector.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/software/ssh/SshCommandEffector.java b/software/base/src/main/java/brooklyn/entity/software/ssh/SshCommandEffector.java deleted file mode 100644 index 4c3dcbe..0000000 --- a/software/base/src/main/java/brooklyn/entity/software/ssh/SshCommandEffector.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.software.ssh; - -import java.util.Map; - -import org.apache.brooklyn.api.effector.Effector; -import org.apache.brooklyn.api.effector.ParameterType; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.effector.core.AddEffector; -import org.apache.brooklyn.effector.core.EffectorBody; -import org.apache.brooklyn.effector.core.Effectors; -import org.apache.brooklyn.effector.core.Effectors.EffectorBuilder; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.core.config.ConfigBag; -import org.apache.brooklyn.util.text.Strings; - -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.software.SshEffectorTasks; -import brooklyn.entity.software.SshEffectorTasks.SshEffectorTaskFactory; - -import com.google.common.base.Preconditions; - -public final class SshCommandEffector extends AddEffector { - - public static final ConfigKey<String> EFFECTOR_COMMAND = ConfigKeys.newStringConfigKey("command"); - public static final ConfigKey<String> EFFECTOR_EXECUTION_DIR = SshCommandSensor.SENSOR_EXECUTION_DIR; - - public SshCommandEffector(ConfigBag params) { - super(newEffectorBuilder(params).build()); - } - - public SshCommandEffector(Map<String,String> params) { - this(ConfigBag.newInstance(params)); - } - - public static EffectorBuilder<String> newEffectorBuilder(ConfigBag params) { - EffectorBuilder<String> eff = AddEffector.newEffectorBuilder(String.class, params); - eff.impl(new Body(eff.buildAbstract(), params)); - return eff; - } - - - protected static class Body extends EffectorBody<String> { - private final Effector<?> effector; - private final String command; - private final String executionDir; - - public Body(Effector<?> eff, ConfigBag params) { - this.effector = eff; - this.command = Preconditions.checkNotNull(params.get(EFFECTOR_COMMAND), "command must be supplied when defining this effector"); - this.executionDir = params.get(EFFECTOR_EXECUTION_DIR); - // TODO could take a custom "env" aka effectorShellEnv - } - - @Override - public String call(ConfigBag params) { - String command = this.command; - - command = SshCommandSensor.makeCommandExecutingInDirectory(command, executionDir, entity()); - - MutableMap<String, String> env = MutableMap.of(); - // first set all declared parameters, including default values - for (ParameterType<?> param: effector.getParameters()) { - env.addIfNotNull(param.getName(), Strings.toString( params.get(Effectors.asConfigKey(param)) )); - } - - // then set things from the entities defined shell environment, if applicable - env.putAll(Strings.toStringMap(entity().getConfig(SoftwareProcess.SHELL_ENVIRONMENT), "")); - - // if we wanted to resolve the surrounding environment in real time -- see above -// Map<String,Object> paramsResolved = (Map<String, Object>) Tasks.resolveDeepValue(effectorShellEnv, Map.class, entity().getExecutionContext()); - - // finally set the parameters we've been passed; this will repeat declared parameters but to no harm, - // it may pick up additional values (could be a flag defining whether this is permitted or not) - env.putAll(Strings.toStringMap(params.getAllConfig())); - - SshEffectorTaskFactory<String> t = SshEffectorTasks.ssh(command) - .requiringZeroAndReturningStdout() - .summary("effector "+effector.getName()) - .environmentVariables(env); - return queue(t).get(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/software/ssh/SshCommandSensor.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/software/ssh/SshCommandSensor.java b/software/base/src/main/java/brooklyn/entity/software/ssh/SshCommandSensor.java deleted file mode 100644 index fa51664..0000000 --- a/software/base/src/main/java/brooklyn/entity/software/ssh/SshCommandSensor.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.software.ssh; - -import java.util.Map; - -import org.apache.brooklyn.api.entity.EntityInitializer; -import org.apache.brooklyn.api.internal.EntityLocal; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.effector.core.AddSensor; -import org.apache.brooklyn.sensor.feed.ssh.SshFeed; -import org.apache.brooklyn.sensor.feed.ssh.SshPollConfig; -import org.apache.brooklyn.sensor.feed.ssh.SshValueFunctions; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.core.config.ConfigBag; -import org.apache.brooklyn.util.core.flags.TypeCoercions; -import org.apache.brooklyn.util.os.Os; -import org.apache.brooklyn.util.text.Strings; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.software.http.HttpRequestSensor; -import brooklyn.entity.software.java.JmxAttributeSensor; - -import com.google.common.annotations.Beta; -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; - -/** - * Configurable {@link EntityInitializer} which adds an SSH sensor feed running the <code>command</code> supplied - * in order to populate the sensor with the indicated <code>name</code>. Note that the <code>targetType</code> is ignored, - * and always set to {@link String}. - * - * @see HttpRequestSensor - * @see JmxAttributeSensor - */ -@Beta -public final class SshCommandSensor<T> extends AddSensor<T> { - - private static final Logger LOG = LoggerFactory.getLogger(SshCommandSensor.class); - - public static final ConfigKey<String> SENSOR_COMMAND = ConfigKeys.newStringConfigKey("command", "SSH command to execute for sensor"); - public static final ConfigKey<String> SENSOR_EXECUTION_DIR = ConfigKeys.newStringConfigKey("executionDir", "Directory where the command should run; " - + "if not supplied, executes in the entity's run dir (or home dir if no run dir is defined); " - + "use '~' to always execute in the home dir, or 'custom-feed/' to execute in a custom-feed dir relative to the run dir"); - - protected final String command; - protected final String executionDir; - - public SshCommandSensor(final ConfigBag params) { - super(params); - - // TODO create a supplier for the command string to support attribute embedding - command = Preconditions.checkNotNull(params.get(SENSOR_COMMAND), "command"); - - executionDir = params.get(SENSOR_EXECUTION_DIR); - } - - @Override - public void apply(final EntityLocal entity) { - super.apply(entity); - - if (LOG.isDebugEnabled()) { - LOG.debug("Adding SSH sensor {} to {}", name, entity); - } - - Supplier<Map<String,String>> envSupplier = new Supplier<Map<String,String>>() { - @Override - public Map<String, String> get() { - return MutableMap.copyOf(Strings.toStringMap(entity.getConfig(SoftwareProcess.SHELL_ENVIRONMENT), "")); - } - }; - - Supplier<String> commandSupplier = new Supplier<String>() { - @Override - public String get() { - return makeCommandExecutingInDirectory(command, executionDir, entity); - } - }; - - SshPollConfig<T> pollConfig = new SshPollConfig<T>(sensor) - .period(period) - .env(envSupplier) - .command(commandSupplier) - .checkSuccess(SshValueFunctions.exitStatusEquals(0)) - .onFailureOrException(Functions.constant((T) null)) - .onSuccess(Functions.compose(new Function<String, T>() { - @Override - public T apply(String input) { - return TypeCoercions.coerce(input, getType(type)); - }}, SshValueFunctions.stdout())); - - SshFeed.builder() - .entity(entity) - .onlyIfServiceUp() - .poll(pollConfig) - .build(); - } - - static String makeCommandExecutingInDirectory(String command, String executionDir, EntityLocal entity) { - String finalCommand = command; - String execDir = executionDir; - if (Strings.isBlank(execDir)) { - // default to run dir - execDir = entity.getAttribute(SoftwareProcess.RUN_DIR); - // if no run dir, default to home - if (Strings.isBlank(execDir)) { - execDir = "~"; - } - } else if (!Os.isAbsolutish(execDir)) { - // relative paths taken wrt run dir - String runDir = entity.getAttribute(SoftwareProcess.RUN_DIR); - if (!Strings.isBlank(runDir)) { - execDir = Os.mergePaths(runDir, execDir); - } - } - if (!"~".equals(execDir)) { - finalCommand = "mkdir -p '"+execDir+"' && cd '"+execDir+"' && "+finalCommand; - } - return finalCommand; - } - -}
