http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/pool/ServerPoolImpl.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/pool/ServerPoolImpl.java b/software/base/src/main/java/brooklyn/entity/pool/ServerPoolImpl.java deleted file mode 100644 index 0076f9e..0000000 --- a/software/base/src/main/java/brooklyn/entity/pool/ServerPoolImpl.java +++ /dev/null @@ -1,432 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.pool; - -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.location.LocationDefinition; -import org.apache.brooklyn.api.location.MachineLocation; -import org.apache.brooklyn.api.location.NoMachinesAvailableException; -import org.apache.brooklyn.api.mgmt.LocationManager; -import org.apache.brooklyn.api.mgmt.Task; -import org.apache.brooklyn.api.policy.PolicySpec; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.effector.core.Effectors; -import org.apache.brooklyn.entity.core.Attributes; -import org.apache.brooklyn.entity.core.EntityInternal; -import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy; -import org.apache.brooklyn.entity.group.DynamicClusterImpl; -import org.apache.brooklyn.entity.lifecycle.Lifecycle; -import org.apache.brooklyn.entity.trait.Startable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.brooklyn.location.basic.BasicLocationDefinition; -import org.apache.brooklyn.location.basic.Machines; -import org.apache.brooklyn.location.dynamic.DynamicLocation; -import org.apache.brooklyn.sensor.core.Sensors; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.core.task.DynamicTasks; -import org.apache.brooklyn.util.guava.Maybe; - -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.reflect.TypeToken; - -public class ServerPoolImpl extends DynamicClusterImpl implements ServerPool { - - private static final Logger LOG = LoggerFactory.getLogger(ServerPoolImpl.class); - - private static enum MachinePoolMemberStatus { - /** The server is available for use */ - AVAILABLE, - /** The server has been leased to another application */ - CLAIMED, - /** - * The server will not be leased to other applications. It will be the first - * candidate to release when the pool is shrunk. - */ - UNUSABLE - } - - private static final AttributeSensor<MachinePoolMemberStatus> SERVER_STATUS = Sensors.newSensor(MachinePoolMemberStatus.class, - "pool.serverStatus", "The status of an entity in the pool"); - - // The sensors here would be better as private fields but there's not really a - // good way to manage their state when rebinding. - - /** Accesses must be synchronised by mutex */ - // Would use BiMap but persisting them tends to throw ConcurrentModificationExceptions. - @SuppressWarnings("serial") - public static final AttributeSensor<Map<Entity, MachineLocation>> ENTITY_MACHINE = Sensors.newSensor(new TypeToken<Map<Entity, MachineLocation>>() {}, - "pool.entityMachineMap", "A mapping of entities and their machine locations"); - - @SuppressWarnings("serial") - public static final AttributeSensor<Map<MachineLocation, Entity>> MACHINE_ENTITY = Sensors.newSensor(new TypeToken<Map<MachineLocation, Entity>>() {}, - "pool.machineEntityMap", "A mapping of machine locations and their entities"); - - public static final AttributeSensor<LocationDefinition> DYNAMIC_LOCATION_DEFINITION = Sensors.newSensor(LocationDefinition.class, - "pool.locationDefinition", "The location definition used to create the pool's dynamic location"); - - public static final ConfigKey<Boolean> REMOVABLE = ConfigKeys.newBooleanConfigKey( - "pool.member.removable", "Whether a pool member is removable from the cluster. Used to denote additional " + - "existing machines that were manually added to the pool", true); - - @SuppressWarnings("unused") - private MemberTrackingPolicy membershipTracker; - - @Override - public void init() { - super.init(); - setAttribute(AVAILABLE_COUNT, 0); - setAttribute(CLAIMED_COUNT, 0); - setAttribute(ENTITY_MACHINE, Maps.<Entity, MachineLocation>newHashMap()); - setAttribute(MACHINE_ENTITY, Maps.<MachineLocation, Entity>newHashMap()); - } - - @Override - public void start(Collection<? extends Location> locations) { - // super.start must happen before the policy is added else the initial - // members wont be up (and thus have a MachineLocation) when onEntityAdded - // is called. - super.start(locations); - createLocation(); - addMembershipTrackerPolicy(); - } - - @Override - public void rebind() { - super.rebind(); - addMembershipTrackerPolicy(); - createLocation(); - } - - @Override - public void stop() { - super.stop(); - deleteLocation(); - synchronized (mutex) { - setAttribute(AVAILABLE_COUNT, 0); - setAttribute(CLAIMED_COUNT, 0); - getAttribute(ENTITY_MACHINE).clear(); - getAttribute(MACHINE_ENTITY).clear(); - } - } - - private void addMembershipTrackerPolicy() { - membershipTracker = addPolicy(PolicySpec.create(MemberTrackingPolicy.class) - .displayName(getDisplayName() + " membership tracker") - .configure("group", this)); - } - - @Override - public ServerPoolLocation getDynamicLocation() { - return (ServerPoolLocation) getAttribute(DYNAMIC_LOCATION); - } - - protected ServerPoolLocation createLocation() { - return createLocation(MutableMap.<String, Object>builder() - .putAll(getConfig(LOCATION_FLAGS)) - .put(DynamicLocation.OWNER.getName(), this) - .build()); - } - - @Override - public ServerPoolLocation createLocation(Map<String, ?> flags) { - String locationName = getConfig(LOCATION_NAME); - if (locationName == null) { - String prefix = getConfig(LOCATION_NAME_PREFIX); - String suffix = getConfig(LOCATION_NAME_SUFFIX); - locationName = Joiner.on("-").skipNulls().join(prefix, getId(), suffix); - } - - String locationSpec = String.format(ServerPoolLocationResolver.POOL_SPEC, getId()) + String.format(":(name=\"%s\")", locationName); - LocationDefinition definition = new BasicLocationDefinition(locationName, locationSpec, flags); - getManagementContext().getLocationRegistry().updateDefinedLocation(definition); - Location location = getManagementContext().getLocationRegistry().resolve(definition); - LOG.info("Resolved and registered dynamic location {}: {}", locationName, location); - - setAttribute(LOCATION_SPEC, locationSpec); - setAttribute(DYNAMIC_LOCATION, location); - setAttribute(LOCATION_NAME, location.getId()); - setAttribute(DYNAMIC_LOCATION_DEFINITION, definition); - - return (ServerPoolLocation) location; - } - - @Override - public void deleteLocation() { - LocationManager mgr = getManagementContext().getLocationManager(); - ServerPoolLocation location = getDynamicLocation(); - if (mgr.isManaged(location)) { - LOG.debug("{} deleting and unmanaging location {}", this, location); - mgr.unmanage(location); - } - // definition will only be null if deleteLocation has already been called, e.g. by two calls to stop(). - LocationDefinition definition = getAttribute(DYNAMIC_LOCATION_DEFINITION); - if (definition != null) { - LOG.debug("{} unregistering dynamic location {}", this, definition); - getManagementContext().getLocationRegistry().removeDefinedLocation(definition.getId()); - } - setAttribute(LOCATION_SPEC, null); - setAttribute(DYNAMIC_LOCATION, null); - setAttribute(LOCATION_NAME, null); - setAttribute(DYNAMIC_LOCATION_DEFINITION, null); - } - - @Override - public boolean isLocationAvailable() { - // FIXME: What do true/false mean to callers? - // Is it valid to return false if availableMachines is empty? - return getDynamicLocation() != null; - } - - @Override - public MachineLocation claimMachine(Map<?, ?> flags) throws NoMachinesAvailableException { - LOG.info("Obtaining machine with flags: {}", Joiner.on(", ").withKeyValueSeparator("=").join(flags)); - synchronized (mutex) { - Optional<Entity> claimed = getMemberWithStatus(MachinePoolMemberStatus.AVAILABLE); - if (claimed.isPresent()) { - setEntityStatus(claimed.get(), MachinePoolMemberStatus.CLAIMED); - updateCountSensors(); - LOG.debug("{} has been claimed in {}", claimed, this); - return getEntityMachineMap().get(claimed.get()); - } else { - throw new NoMachinesAvailableException("No machines available in " + this); - } - } - } - - @Override - public void releaseMachine(MachineLocation machine) { - synchronized (mutex) { - Entity entity = getMachineEntityMap().get(machine); - if (entity == null) { - LOG.warn("{} releasing machine {} but its owning entity is not known!", this, machine); - } else { - setEntityStatus(entity, MachinePoolMemberStatus.AVAILABLE); - updateCountSensors(); - LOG.debug("{} has been released in {}", machine, this); - } - } - } - - @Override - public Entity addExistingMachine(MachineLocation machine) { - LOG.info("Adding additional machine to {}: {}", this, machine); - Entity added = addNode(machine, MutableMap.of(REMOVABLE, false)); - Map<String, ?> args = ImmutableMap.of("locations", ImmutableList.of(machine)); - Task<Void> task = Effectors.invocation(added, Startable.START, args).asTask(); - DynamicTasks.queueIfPossible(task).orSubmitAsync(this); - return added; - } - - @Override - public Collection<Entity> addExistingMachinesFromSpec(String spec) { - Location location = getManagementContext().getLocationRegistry().resolve(spec, true, null).orNull(); - List<Entity> additions = Lists.newLinkedList(); - if (location == null) { - LOG.warn("Spec was unresolvable: {}", spec); - } else { - Iterable<MachineLocation> machines = FluentIterable.from(location.getChildren()) - .filter(MachineLocation.class); - LOG.info("{} adding additional machines: {}", this, machines); - // Doesn't need to be synchronised on mutex: it will be claimed per-machine - // as the new members are handled by the membership tracking policy. - for (MachineLocation machine : machines) { - additions.add(addExistingMachine(machine)); - } - LOG.debug("{} added additional machines", this); - } - return additions; - } - - /** - * Overrides to restrict delta to the number of machines that can be <em>safely</em> - * removed (i.e. those that are {@link MachinePoolMemberStatus#UNUSABLE unusable} or - * {@link MachinePoolMemberStatus#AVAILABLE available}). - * <p/> - * Does not modify delta if the pool is stopping. - * @param delta Requested number of members to remove - * @return The entities that were removed - */ - @Override - protected Collection<Entity> shrink(int delta) { - if (Lifecycle.STOPPING.equals(getAttribute(Attributes.SERVICE_STATE_ACTUAL))) { - return super.shrink(delta); - } - - synchronized (mutex) { - int removable = 0; - for (Entity entity : getMembers()) { - // Skip machine marked not for removal and machines that are claimed - if (!Boolean.FALSE.equals(entity.getConfig(REMOVABLE)) && - !MachinePoolMemberStatus.CLAIMED.equals(entity.getAttribute(SERVER_STATUS))) { - removable -= 1; - } - } - - if (delta < removable) { - LOG.warn("Too few removable machines in {} to shrink by delta {}. Altered delta to {}", - new Object[]{this, delta, removable}); - delta = removable; - } - - Collection<Entity> removed = super.shrink(delta); - updateCountSensors(); - return removed; - } - } - - private Map<Entity, MachineLocation> getEntityMachineMap() { - return getAttribute(ENTITY_MACHINE); - } - - private Map<MachineLocation, Entity> getMachineEntityMap() { - return getAttribute(MACHINE_ENTITY); - } - - @Override - public Function<Collection<Entity>, Entity> getRemovalStrategy() { - return UNCLAIMED_REMOVAL_STRATEGY; - } - - private final Function<Collection<Entity>, Entity> UNCLAIMED_REMOVAL_STRATEGY = new Function<Collection<Entity>, Entity>() { - // Semantics of superclass mean that mutex should already be held when apply is called - @Override - public Entity apply(Collection<Entity> members) { - synchronized (mutex) { - Optional<Entity> choice; - if (Lifecycle.STOPPING.equals(getAttribute(Attributes.SERVICE_STATE_ACTUAL))) { - choice = Optional.of(members.iterator().next()); - } else { - // Otherwise should only choose between removable + unusable or available - choice = getMemberWithStatusExcludingUnremovable(members, MachinePoolMemberStatus.UNUSABLE) - .or(getMemberWithStatusExcludingUnremovable(members, MachinePoolMemberStatus.AVAILABLE)); - } - if (!choice.isPresent()) { - LOG.warn("{} has no machines available to remove!", this); - return null; - } else { - LOG.info("{} selected entity to remove from pool: {}", this, choice.get()); - choice.get().getAttribute(SERVER_STATUS); - setEntityStatus(choice.get(), null); - } - MachineLocation entityLocation = getEntityMachineMap().remove(choice.get()); - if (entityLocation != null) { - getMachineEntityMap().remove(entityLocation); - } - return choice.get(); - } - } - }; - - private void serverAdded(Entity member) { - Maybe<MachineLocation> machine = Machines.findUniqueMachineLocation(member.getLocations()); - if (member.getAttribute(SERVER_STATUS) != null) { - LOG.debug("Skipped addition of machine already in the pool: {}", member); - } else if (machine.isPresentAndNonNull()) { - MachineLocation m = machine.get(); - LOG.info("New machine in {}: {}", this, m); - setEntityStatus(member, MachinePoolMemberStatus.AVAILABLE); - synchronized (mutex) { - getEntityMachineMap().put(member, m); - getMachineEntityMap().put(m, member); - updateCountSensors(); - } - } else { - LOG.warn("Member added to {} that does not have a machine location; it will not be used by the pool: {}", - ServerPoolImpl.this, member); - setEntityStatus(member, MachinePoolMemberStatus.UNUSABLE); - } - } - - private void setEntityStatus(Entity entity, MachinePoolMemberStatus status) { - ((EntityInternal) entity).setAttribute(SERVER_STATUS, status); - } - - private Optional<Entity> getMemberWithStatus(MachinePoolMemberStatus status) { - return getMemberWithStatus0(getMembers(), status, true); - } - - private Optional<Entity> getMemberWithStatusExcludingUnremovable(Collection<Entity> entities, MachinePoolMemberStatus status) { - return getMemberWithStatus0(entities, status, false); - } - - private Optional<Entity> getMemberWithStatus0(Collection<Entity> entities, final MachinePoolMemberStatus status, final boolean includeUnremovableMachines) { - return Iterables.tryFind(entities, - new Predicate<Entity>() { - @Override - public boolean apply(Entity input) { - return (includeUnremovableMachines || isRemovable(input)) && - status.equals(input.getAttribute(SERVER_STATUS)); - } - }); - } - - /** @return true if the entity has {@link #REMOVABLE} set to null or true. */ - private boolean isRemovable(Entity entity) { - return !Boolean.FALSE.equals(entity.getConfig(REMOVABLE)); - } - - private void updateCountSensors() { - synchronized (mutex) { - int available = 0, claimed = 0; - for (Entity member : getMembers()) { - MachinePoolMemberStatus status = member.getAttribute(SERVER_STATUS); - if (MachinePoolMemberStatus.AVAILABLE.equals(status)) { - available++; - } else if (MachinePoolMemberStatus.CLAIMED.equals(status)) { - claimed++; - } - } - setAttribute(AVAILABLE_COUNT, available); - setAttribute(CLAIMED_COUNT, claimed); - } - } - - public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { - @Override - protected void onEntityEvent(EventType type, Entity member) { - Boolean isUp = member.getAttribute(Attributes.SERVICE_UP); - LOG.info("{} in {}: {} service up is {}", new Object[]{type.name(), entity, member, isUp}); - if (type.equals(EventType.ENTITY_ADDED) || type.equals(EventType.ENTITY_CHANGE)) { - if (Boolean.TRUE.equals(isUp)) { - ((ServerPoolImpl) entity).serverAdded(member); - } else if (LOG.isDebugEnabled()) { - LOG.debug("{} observed event {} but {} is not up (yet) and will not be used by the pool", - new Object[]{entity, type.name(), member}); - } - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/pool/ServerPoolLocation.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/pool/ServerPoolLocation.java b/software/base/src/main/java/brooklyn/entity/pool/ServerPoolLocation.java deleted file mode 100644 index b748a24..0000000 --- a/software/base/src/main/java/brooklyn/entity/pool/ServerPoolLocation.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.pool; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Collection; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Maps; - -import org.apache.brooklyn.api.location.MachineLocation; -import org.apache.brooklyn.api.location.MachineProvisioningLocation; -import org.apache.brooklyn.api.location.NoMachinesAvailableException; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.location.basic.AbstractLocation; -import org.apache.brooklyn.location.dynamic.DynamicLocation; -import org.apache.brooklyn.util.core.flags.SetFromFlag; - -public class ServerPoolLocation extends AbstractLocation implements MachineProvisioningLocation<MachineLocation>, - DynamicLocation<ServerPool, ServerPoolLocation> { - - private static final long serialVersionUID = -6771844611899475409L; - - private static final Logger LOG = LoggerFactory.getLogger(ServerPoolLocation.class); - - @SetFromFlag("owner") - public static final ConfigKey<ServerPool> OWNER = ConfigKeys.newConfigKey( - ServerPool.class, "pool.location.owner"); - - @Override - public void init() { - LOG.debug("Initialising. Owner is: {}", checkNotNull(getConfig(OWNER), OWNER.getName())); - super.init(); - } - - @Override - public ServerPool getOwner() { - return getConfig(OWNER); - } - - @Override - public MachineLocation obtain(Map<?, ?> flags) throws NoMachinesAvailableException { - // Call server pool and try to obtain one of its machines - return getOwner().claimMachine(flags); - } - - @Override - public MachineProvisioningLocation<MachineLocation> newSubLocation(Map<?, ?> newFlags) { - throw new UnsupportedOperationException(); - } - - @Override - public void release(MachineLocation machine) { - getOwner().releaseMachine(machine); - } - - @Override - public Map<String, Object> getProvisioningFlags(Collection<String> tags) { - return Maps.newLinkedHashMap(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/pool/ServerPoolLocationResolver.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/pool/ServerPoolLocationResolver.java b/software/base/src/main/java/brooklyn/entity/pool/ServerPoolLocationResolver.java deleted file mode 100644 index cd32156..0000000 --- a/software/base/src/main/java/brooklyn/entity/pool/ServerPoolLocationResolver.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.pool; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Collections; -import java.util.Map; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.location.Location; -import org.apache.brooklyn.api.location.LocationRegistry; -import org.apache.brooklyn.api.location.LocationSpec; -import org.apache.brooklyn.api.location.LocationResolver.EnableableLocationResolver; -import org.apache.brooklyn.api.mgmt.ManagementContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; - -import org.apache.brooklyn.location.basic.BasicLocationRegistry; -import org.apache.brooklyn.location.basic.LocationInternal; -import org.apache.brooklyn.location.basic.LocationPropertiesFromBrooklynProperties; -import org.apache.brooklyn.location.dynamic.DynamicLocation; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.text.KeyValueParser; -import org.apache.brooklyn.util.text.Strings; - -public class ServerPoolLocationResolver implements EnableableLocationResolver { - - private static final Logger LOG = LoggerFactory.getLogger(ServerPoolLocationResolver.class); - private static final String PREFIX = "pool"; - public static final String POOL_SPEC = PREFIX + ":%s"; - private static final Pattern PATTERN = Pattern.compile("("+PREFIX+"|"+PREFIX.toUpperCase()+")" + - ":([a-zA-Z0-9]+)" + // pool Id - "(:\\((.*)\\))?$"); // arguments, e.g. displayName - - private static final Set<String> ACCEPTABLE_ARGS = ImmutableSet.of("name", "displayName"); - - private ManagementContext managementContext; - - @Override - public boolean isEnabled() { - return true; - } - - @Override - public void init(ManagementContext managementContext) { - this.managementContext = checkNotNull(managementContext, "managementContext"); - } - - @Override - public String getPrefix() { - return PREFIX; - } - - @Override - public boolean accepts(String spec, LocationRegistry registry) { - return BasicLocationRegistry.isResolverPrefixForSpec(this, spec, true); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public Location newLocationFromString(Map locationFlags, String spec, LocationRegistry registry) { - if (LOG.isDebugEnabled()) { - LOG.debug("Resolving location '" + spec + "' with flags " + Joiner.on(",").withKeyValueSeparator("=").join(locationFlags)); - } - String namedLocation = (String) locationFlags.get(LocationInternal.NAMED_SPEC_NAME.getName()); - - Matcher matcher = PATTERN.matcher(spec); - if (!matcher.matches()) { - String m = String.format("Invalid location '%s'; must specify either %s:entityId or %s:entityId:(key=argument)", - spec, PREFIX, PREFIX); - throw new IllegalArgumentException(m); - } - - String argsPart = matcher.group(4); - Map<String, String> argsMap = (argsPart != null) ? KeyValueParser.parseMap(argsPart) : Collections.<String,String>emptyMap(); - String displayNamePart = argsMap.get("displayName"); - String namePart = argsMap.get("name"); - - if (!ACCEPTABLE_ARGS.containsAll(argsMap.keySet())) { - Set<String> illegalArgs = Sets.difference(argsMap.keySet(), ACCEPTABLE_ARGS); - throw new IllegalArgumentException("Invalid location '"+spec+"'; illegal args "+illegalArgs+"; acceptable args are "+ACCEPTABLE_ARGS); - } - if (argsMap.containsKey("displayName") && Strings.isEmpty(displayNamePart)) { - throw new IllegalArgumentException("Invalid location '"+spec+"'; if displayName supplied then value must be non-empty"); - } - if (argsMap.containsKey("name") && Strings.isEmpty(namePart)) { - throw new IllegalArgumentException("Invalid location '"+spec+"'; if name supplied then value must be non-empty"); - } - - Map<String, Object> filteredProperties = new LocationPropertiesFromBrooklynProperties() - .getLocationProperties(PREFIX, namedLocation, registry.getProperties()); - MutableMap<String, Object> flags = MutableMap.<String, Object>builder() - .putAll(filteredProperties) - .putAll(locationFlags) - .build(); - - String poolId = matcher.group(2); - if (Strings.isBlank(poolId)) { - throw new IllegalArgumentException("Invalid location '"+spec+"'; pool's entity id must be non-empty"); - } - - final String displayName = displayNamePart != null ? displayNamePart : "Server Pool " + poolId; - final String locationName = namePart != null ? namePart : "serverpool-" + poolId; - - Entity pool = managementContext.getEntityManager().getEntity(poolId); - LocationSpec<ServerPoolLocation> locationSpec = LocationSpec.create(ServerPoolLocation.class) - .configure(flags) - .configure(DynamicLocation.OWNER, pool) - .configure(LocationInternal.NAMED_SPEC_NAME, locationName) - .displayName(displayName); - return managementContext.getLocationManager().createLocation(locationSpec); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/service/EntityLaunchListener.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/service/EntityLaunchListener.java b/software/base/src/main/java/brooklyn/entity/service/EntityLaunchListener.java deleted file mode 100644 index d7cab7b..0000000 --- a/software/base/src/main/java/brooklyn/entity/service/EntityLaunchListener.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.service; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.mgmt.ExecutionManager; -import org.apache.brooklyn.api.mgmt.Task; -import org.apache.brooklyn.api.sensor.SensorEvent; -import org.apache.brooklyn.api.sensor.SensorEventListener; -import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; -import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.EffectorCallTag; -import org.apache.brooklyn.entity.lifecycle.Lifecycle; -import org.apache.brooklyn.util.core.task.Tasks; - -public class EntityLaunchListener implements Runnable, SensorEventListener<Lifecycle> { - private static final String SSH_LAUNCH_TASK_PREFIX = "ssh: launching"; - private static final String LAUNCH_CHECK_SKIP_TAG = "system-service-update"; - - private final AtomicReference<Task<?>> launchTaskRef = new AtomicReference<Task<?>>(); - private final SystemServiceEnricher enricher; - - public EntityLaunchListener(SystemServiceEnricher enricher) { - this.enricher = checkNotNull(enricher, "enricher"); - } - - @Override - public void onEvent(SensorEvent<Lifecycle> event) { - if (event.getValue() == Lifecycle.RUNNING) { - Task<?>launchTask = getLatestLaunchTask(enricher.getEntity()); - if (launchTask != null) { - launchTaskRef.set(launchTask); - if (!launchTask.isDone()) { - launchTask.addListener(this, enricher.getEntityExecutionContext()); - } - if (launchTask.isDone()) { - run(); - } - } - } - } - - @Override - public void run() { - Task<?> launchTask = launchTaskRef.getAndSet(null); - if (launchTask == null) return; - if (launchTask.isError()) return; - enricher.onLaunched(launchTask); - } - - private Task<?> getLatestLaunchTask(Entity entity) { - Task<?> startEffector = null; - ExecutionManager executionmgr = enricher.getManagementContext().getExecutionManager(); - Set<Task<?>> entityTasks = BrooklynTaskTags.getTasksInEntityContext(executionmgr, entity); - for (Task<?> t : entityTasks) { - if (BrooklynTaskTags.isEffectorTask(t)) { - EffectorCallTag effectorTag = BrooklynTaskTags.getEffectorCallTag(t, false); - if (SystemServiceEnricher.LAUNCH_EFFECTOR_NAMES.contains(effectorTag.getEffectorName()) && - !BrooklynTaskTags.hasTag(t, LAUNCH_CHECK_SKIP_TAG)) { - if (startEffector == null || startEffector.getStartTimeUtc() < t.getStartTimeUtc()) { - startEffector = t; - } - BrooklynTaskTags.addTagDynamically(t, LAUNCH_CHECK_SKIP_TAG); - } - } - } - if (startEffector != null) { - Task<?> launchTask = findSshLaunchChild(startEffector); - if (launchTask != null) { - return launchTask; - } - } - return null; - } - - private Task<?> findSshLaunchChild(Task<?> t) { - Iterable<Task<?>> children = Tasks.children(t); - for (Task<?> c : children) { - if (c.getDisplayName().startsWith(SSH_LAUNCH_TASK_PREFIX)) { - return c; - } - } - for (Task<?> c : children) { - Task<?> launchTask = findSshLaunchChild(c); - if (launchTask != null) { - return launchTask; - } - } - return null; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/service/InitdServiceInstaller.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/service/InitdServiceInstaller.java b/software/base/src/main/java/brooklyn/entity/service/InitdServiceInstaller.java deleted file mode 100644 index 59fba2e..0000000 --- a/software/base/src/main/java/brooklyn/entity/service/InitdServiceInstaller.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.service; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.File; -import java.util.Map; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.mgmt.Task; -import org.apache.brooklyn.api.objs.HasShortName; -import org.apache.brooklyn.api.sensor.Enricher; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.effector.core.EffectorTasks; -import org.apache.brooklyn.entity.core.Attributes; -import org.apache.brooklyn.entity.core.EntityInternal; - -import brooklyn.entity.basic.SoftwareProcess; - -import org.apache.brooklyn.location.basic.SshMachineLocation; -import org.apache.brooklyn.location.cloud.names.AbstractCloudMachineNamer; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.core.ResourceUtils; -import org.apache.brooklyn.util.core.task.Tasks; -import org.apache.brooklyn.util.core.task.ssh.SshPutTaskWrapper; -import org.apache.brooklyn.util.core.task.ssh.SshTasks; -import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; -import org.apache.brooklyn.util.core.text.TemplateProcessor; -import org.apache.brooklyn.util.os.Os; -import org.apache.brooklyn.util.ssh.BashCommands; - - -public class InitdServiceInstaller implements SystemServiceInstaller { - private static final ConfigKey<String> SERVICE_TEMPLATE = ConfigKeys.newStringConfigKey( - "service.initd.service_template", "URL of the template to be used as the /etc/init.d service", "classpath:///brooklyn/entity/service/service.sh"); - - private final Entity entity; - private final Enricher enricher; - - public InitdServiceInstaller(Entity entity, Enricher enricher) { - this.entity = checkNotNull(entity, "entity"); - this.enricher = checkNotNull(enricher, "enricher"); - } - - @Override - public Task<?> getServiceInstallTask() { - ResourceUtils resource = new ResourceUtils(this); - String pidFile = entity.getAttribute(SoftwareProcess.PID_FILE); - String template = resource.getResourceAsString(enricher.config().get(SERVICE_TEMPLATE)); - String serviceName = getServiceName(); - SshMachineLocation sshMachine = EffectorTasks.getSshMachine(entity); - Map<String, Object> params = MutableMap.<String, Object>of( - "service.launch_script", Os.mergePaths(getRunDir(), getStartScriptName()), - "service.name", serviceName, - "service.user", sshMachine.getUser(), - "service.log_path", getLogLocation()); - if (pidFile != null) { - params.put("service.pid_file", pidFile); - } - String service = TemplateProcessor.processTemplateContents(template, (EntityInternal)entity, params); - String tmpServicePath = Os.mergePaths(getRunDir(), serviceName); - String servicePath = "/etc/init.d/" + serviceName; - SshPutTaskWrapper putServiceTask = SshTasks.newSshPutTaskFactory(sshMachine, tmpServicePath) - .contents(service) - .newTask(); - ProcessTaskWrapper<Integer> installServiceTask = SshTasks.newSshExecTaskFactory(sshMachine, - BashCommands.chain( - BashCommands.sudo("mv " + tmpServicePath + " " + servicePath), - BashCommands.sudo("chmod 0755 " + servicePath), - BashCommands.sudo("chkconfig --add " + serviceName), - BashCommands.sudo("chkconfig " + serviceName + " on"))) - .requiringExitCodeZero() - .newTask(); - - return Tasks.<Void>builder() - .name("install (init.d)") - .description("Install init.d service") - .add(putServiceTask) - .add(installServiceTask) - .build(); - } - - private String getServiceName() { - String serviceNameTemplate = enricher.config().get(SystemServiceEnricher.SERVICE_NAME); - return serviceNameTemplate - .replace("${id}", entity.getId()) - .replace("${entity_name}", getEntityName()); - } - - private CharSequence getEntityName() { - String name; - if (entity instanceof HasShortName) { - name = ((HasShortName)entity).getShortName(); - } else if (entity instanceof Entity) { - name = ((Entity)entity).getDisplayName(); - } else { - name = "brooklyn-service"; - } - return AbstractCloudMachineNamer.sanitize(name.toString()).toLowerCase(); - } - - private String getStartScriptName() { - return enricher.config().get(SystemServiceEnricher.LAUNCH_SCRIPT_NAME); - } - - private String getRunDir() { - return entity.getAttribute(SoftwareProcess.RUN_DIR); - } - - private String getLogLocation() { - String logFileLocation = entity.getAttribute(Attributes.LOG_FILE_LOCATION); - if (logFileLocation != null) { - return new File(logFileLocation).getParent(); - } else { - return "/tmp"; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/service/SystemServiceEnricher.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/service/SystemServiceEnricher.java b/software/base/src/main/java/brooklyn/entity/service/SystemServiceEnricher.java deleted file mode 100644 index 959d078..0000000 --- a/software/base/src/main/java/brooklyn/entity/service/SystemServiceEnricher.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.service; - -import java.util.Set; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.internal.EntityLocal; -import org.apache.brooklyn.api.mgmt.ExecutionContext; -import org.apache.brooklyn.api.mgmt.Task; -import org.apache.brooklyn.api.sensor.Enricher; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; -import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedStream; -import org.apache.brooklyn.effector.core.EffectorTasks; -import org.apache.brooklyn.entity.core.Attributes; -import org.apache.brooklyn.entity.core.Entities; - -import brooklyn.entity.basic.SoftwareProcess; - -import org.apache.brooklyn.location.basic.SshMachineLocation; -import org.apache.brooklyn.sensor.enricher.AbstractEnricher; -import org.apache.brooklyn.util.core.task.BasicExecutionManager; -import org.apache.brooklyn.util.core.task.DynamicTasks; -import org.apache.brooklyn.util.core.task.TaskBuilder; -import org.apache.brooklyn.util.core.task.ssh.SshPutTaskWrapper; -import org.apache.brooklyn.util.core.task.ssh.SshTasks; -import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory; -import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; -import org.apache.brooklyn.util.net.Urls; - -import com.google.common.collect.ImmutableSet; - -public class SystemServiceEnricher extends AbstractEnricher implements Enricher { - public static final String DEFAULT_ENRICHER_UNIQUE_TAG = "systemService.tag"; - protected static final Set<String> LAUNCH_EFFECTOR_NAMES = ImmutableSet.of("start", "restart"); - public static final ConfigKey<String> LAUNCH_SCRIPT_NAME = ConfigKeys.newStringConfigKey( - "service.script_name", "The name of the launch script to be created in the runtime directory of the entity.", "service-launch.sh"); - public static final ConfigKey<String> SERVICE_NAME = ConfigKeys.newStringConfigKey( - "service.name", "The name of the system service. Can use ${entity_name} and ${id} variables to template the value.", "${entity_name}-${id}"); - - @Override - public void setEntity(EntityLocal entity) { - super.setEntity(entity); - subscribeLaunch(); - uniqueTag = DEFAULT_ENRICHER_UNIQUE_TAG; - } - - private void subscribeLaunch() { - subscribe(entity, Attributes.SERVICE_STATE_ACTUAL, new EntityLaunchListener(this)); - } - - public void onLaunched(Task<?> task) { - WrappedStream streamStdin = BrooklynTaskTags.stream(task, BrooklynTaskTags.STREAM_STDIN); - if (streamStdin == null) return; - - WrappedStream streamEnv = BrooklynTaskTags.stream(task, BrooklynTaskTags.STREAM_ENV); - String stdin = streamStdin.streamContents.get(); - String env = streamEnv.streamContents.get(); - - final SshMachineLocation sshMachine = EffectorTasks.getSshMachine(entity); - final String launchScriptPath = Urls.mergePaths(getRunDir(), getStartScriptName()); - - Task<Void> installerTask = TaskBuilder.<Void>builder() - .name("install (service)") - .description("Install as a system service") - .body(new Runnable() { - @Override - public void run() { - ProcessTaskFactory<Integer> taskFactory = SshTasks.newSshExecTaskFactory(sshMachine, "[ -e '" + launchScriptPath + "' ]") - .summary("check installed") - .allowingNonZeroExitCode(); - boolean isInstalled = DynamicTasks.queue(taskFactory).get() == 0; - if (!isInstalled) { - Task<?> serviceInstallTask = SystemServiceInstallerFactory.of(entity, SystemServiceEnricher.this).getServiceInstallTask(); - DynamicTasks.queue(serviceInstallTask); - } - } - }) - .build(); - - SshPutTaskWrapper updateLaunchScriptTask = SshTasks.newSshPutTaskFactory(sshMachine, launchScriptPath).contents(getLaunchScript(stdin, env)).newTask(); - ProcessTaskWrapper<Integer> makeExecutableTask = SshTasks.newSshExecTaskFactory(sshMachine, "chmod +x " + launchScriptPath) - .requiringExitCodeZero() - .newTask(); - Task<Void> udpateTask = TaskBuilder.<Void>builder() - .name("update-launch") - .description("Update launch script used by the system service") - .add(updateLaunchScriptTask) - .add(makeExecutableTask) - .build(); - - Task<Void> updateService = TaskBuilder.<Void>builder() - .name("update-system-service") - .description("Update system service") - .add(installerTask) - .add(udpateTask) - .tag(BrooklynTaskTags.tagForContextEntity(entity)) - .tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) - .build(); - - submitTopLevel(updateService); - } - - private void submitTopLevel(Task<Void> updateService) { - Task<?> currentTask = BasicExecutionManager.getPerThreadCurrentTask().get(); - BasicExecutionManager.getPerThreadCurrentTask().set(null); - try { - Entities.submit(entity, updateService); - } finally { - BasicExecutionManager.getPerThreadCurrentTask().set(currentTask); - } - } - - private String getLaunchScript(String stdin, String env) { - // (?m) - multiline enable - // insert export at beginning of each line - return env.replaceAll("(?m)^", "export ") + "\n" + stdin; - } - - private String getRunDir() { - return entity.getAttribute(SoftwareProcess.RUN_DIR); - } - - private String getStartScriptName() { - return config().get(LAUNCH_SCRIPT_NAME); - } - - ExecutionContext getEntityExecutionContext() { - return getManagementContext().getExecutionContext(entity); - } - - protected Entity getEntity() { - return entity; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/service/SystemServiceInstaller.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/service/SystemServiceInstaller.java b/software/base/src/main/java/brooklyn/entity/service/SystemServiceInstaller.java deleted file mode 100644 index c5e6533..0000000 --- a/software/base/src/main/java/brooklyn/entity/service/SystemServiceInstaller.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.service; - -import org.apache.brooklyn.api.mgmt.Task; - -public interface SystemServiceInstaller { - Task<?> getServiceInstallTask(); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/service/SystemServiceInstallerFactory.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/service/SystemServiceInstallerFactory.java b/software/base/src/main/java/brooklyn/entity/service/SystemServiceInstallerFactory.java deleted file mode 100644 index c7f5d9e..0000000 --- a/software/base/src/main/java/brooklyn/entity/service/SystemServiceInstallerFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.service; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.sensor.Enricher; - -public class SystemServiceInstallerFactory { - public static SystemServiceInstaller of(Entity entity, Enricher systemServiceEnricher) { - return new InitdServiceInstaller(entity, systemServiceEnricher); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/software/MachineInitTasks.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/brooklyn/entity/software/MachineInitTasks.java b/software/base/src/main/java/brooklyn/entity/software/MachineInitTasks.java deleted file mode 100644 index b9c43b8..0000000 --- a/software/base/src/main/java/brooklyn/entity/software/MachineInitTasks.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.software; - -import java.util.List; -import java.util.concurrent.Callable; - -import org.apache.brooklyn.api.mgmt.Task; -import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; -import org.apache.brooklyn.entity.core.EntityInternal; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.Beta; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -import org.apache.brooklyn.location.basic.SshMachineLocation; -import org.apache.brooklyn.util.core.task.DynamicTasks; -import org.apache.brooklyn.util.core.task.Tasks; -import org.apache.brooklyn.util.core.task.ssh.SshTasks; -import org.apache.brooklyn.util.net.Protocol; -import org.apache.brooklyn.util.ssh.BashCommands; -import org.apache.brooklyn.util.ssh.IptablesCommands; -import org.apache.brooklyn.util.ssh.IptablesCommands.Chain; -import org.apache.brooklyn.util.ssh.IptablesCommands.Policy; -import org.apache.brooklyn.util.text.Strings; - -/** - * - */ -@Beta -public class MachineInitTasks { - - // TODO Move somewhere so code can also be called by JcloudsLocation! - - private static final Logger log = LoggerFactory.getLogger(MachineInitTasks.class); - - protected EntityInternal entity() { - return (EntityInternal) BrooklynTaskTags.getTargetOrContextEntity(Tasks.current()); - } - - /** - * Returns a queued {@link Task} which opens the given ports in iptables on the given machine. - */ - public Task<Void> openIptablesAsync(final Iterable<Integer> inboundPorts, final SshMachineLocation machine) { - return DynamicTasks.queue("open iptables "+toTruncatedString(inboundPorts, 6), new Callable<Void>() { - public Void call() { - openIptablesImpl(inboundPorts, machine); - return null; - } - }); - } - - /** - * Returns a queued {@link Task} which stops iptables on the given machine. - */ - public Task<Void> stopIptablesAsync(final SshMachineLocation machine) { - return DynamicTasks.queue("stop iptables", new Callable<Void>() { - public Void call() { - stopIptablesImpl(machine); - return null; - } - }); - } - - /** - * See docs in {@link BashCommands#dontRequireTtyForSudo()} - */ - public Task<Boolean> dontRequireTtyForSudoAsync(final SshMachineLocation machine) { - return DynamicTasks.queue(SshTasks.dontRequireTtyForSudo(machine, true).newTask().asTask()); - } - - protected void openIptablesImpl(Iterable<Integer> inboundPorts, SshMachineLocation machine) { - if (inboundPorts == null || Iterables.isEmpty(inboundPorts)) { - log.info("No ports to open in iptables (no inbound ports) for {} at {}", machine, this); - } else { - log.info("Opening ports in iptables for {} at {}", entity(), machine); - - List<String> iptablesRules = Lists.newArrayList(); - - if (isLocationFirewalldEnabled(machine)) { - for (Integer port : inboundPorts) { - iptablesRules.add(IptablesCommands.addFirewalldRule(Chain.INPUT, Protocol.TCP, port, Policy.ACCEPT)); - } - } else { - iptablesRules = createIptablesRulesForNetworkInterface(inboundPorts); - iptablesRules.add(IptablesCommands.saveIptablesRules()); - } - List<String> batch = Lists.newArrayList(); - // Some entities, such as Riak (erlang based) have a huge range of ports, which leads to a script that - // is too large to run (fails with a broken pipe). Batch the rules into batches of 50 - for (String rule : iptablesRules) { - batch.add(rule); - if (batch.size() == 50) { - machine.execCommands("Inserting iptables rules, 50 command batch", batch); - batch.clear(); - } - } - if (batch.size() > 0) { - machine.execCommands("Inserting iptables rules", batch); - } - machine.execCommands("List iptables rules", ImmutableList.of(IptablesCommands.listIptablesRule())); - } - } - - protected void stopIptablesImpl(SshMachineLocation machine) { - log.info("Stopping iptables for {} at {}", entity(), machine); - - List<String> cmds = ImmutableList.<String>of(); - if (isLocationFirewalldEnabled(machine)) { - cmds = ImmutableList.of(IptablesCommands.firewalldServiceStop(), IptablesCommands.firewalldServiceStatus()); - } else { - cmds = ImmutableList.of(IptablesCommands.iptablesServiceStop(), IptablesCommands.iptablesServiceStatus()); - } - machine.execCommands("Stopping iptables", cmds); - } - - private List<String> createIptablesRulesForNetworkInterface(Iterable<Integer> ports) { - List<String> iptablesRules = Lists.newArrayList(); - for (Integer port : ports) { - iptablesRules.add(IptablesCommands.insertIptablesRule(Chain.INPUT, Protocol.TCP, port, Policy.ACCEPT)); - } - return iptablesRules; - } - - public boolean isLocationFirewalldEnabled(SshMachineLocation location) { - int result = location.execCommands("checking if firewalld is active", - ImmutableList.of(IptablesCommands.firewalldServiceIsActive())); - if (result == 0) { - return true; - } - - return false; - } - - protected String toTruncatedString(Iterable<?> vals, int maxShown) { - StringBuilder result = new StringBuilder("["); - int shown = 0; - for (Object val : (vals == null ? ImmutableList.of() : vals)) { - if (shown != 0) { - result.append(", "); - } - if (shown < maxShown) { - result.append(Strings.toString(val)); - shown++; - } else { - result.append("..."); - break; - } - } - result.append("]"); - return result.toString(); - } -}
