http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunParameters.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunParameters.java b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunParameters.java new file mode 100644 index 0000000..c8742e4 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunParameters.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.followthesun; + +import java.util.LinkedHashSet; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.brooklyn.api.location.Location; + +public class FollowTheSunParameters { + + private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunParameters.class); + + private FollowTheSunParameters() {} + + /** trigger for moving segment X from geo A to geo B: + * where x is total number of requests submitted in X across the CDM network, + * and x_A is number of reqs from geo A, with A the most prolific geography + * (arbitrarily chosen in case of ties so recommended to choose at least a small percent_majority or delta_above_percent_majority, in addition to this field); + * this parameter T defines a number such that x_A > T*x in order for X to be migrated to A + * (but see also DELTA_ABOVE_PERCENT_TOTAL, below) */ + public double triggerPercentTotal = 0.3; + /** fields as above, and T as above, + * this parameter T' defines a number such that x_A > T*x + T' in order for X to be migrated to A */ + public double triggerDeltaAbovePercentTotal = 0; + /** fields as above, + * this parameter T defines a number such that x_A > T in order for X to be migrated to A */ + public double triggerAbsoluteTotal = 2; + + /** fields as above, with X_B the number from a different geography B, + * where A and B are the two most prolific requesters of X, and X_A >= X_B; + * this parameter T defines a number such that x_A-x_B > T*x in order for X to be migrated to A */ + public double triggerPercentMajority = 0.2; + /** as corresponding majority and total fields, with x_A-x_B on the LHS of inequality */ + public double triggerDeltaAbovePercentMajority = 1; + /** as corresponding majority and total fields, with x_A-x_B on the LHS of inequality */ + public double triggerAbsoluteMajority = 4; + + /** a list of excluded locations */ + public Set<Location> excludedLocations = new LinkedHashSet<Location>(); + + public static FollowTheSunParameters newDefault() { + return new FollowTheSunParameters(); + } + + private static double parseDouble(String text, double defaultValue) { + try { + double d = Double.parseDouble(text); + if (!Double.isNaN(d)) return d; + } catch (Exception e) { + LOG.warn("Illegal double value '"+text+"', using default "+defaultValue+": "+e, e); + } + return defaultValue; + } + + private static String[] parseCommaSeparatedList(String csv) { + if (csv==null || csv.trim().length()==0) return new String[0]; + return csv.split(","); + } + + public boolean isTriggered(double highest, double total, double nextHighest, double current) { + if (highest <= current) return false; + if (highest < total*triggerPercentTotal + triggerDeltaAbovePercentTotal) return false; + if (highest < triggerAbsoluteTotal) return false; + //TODO more params about nextHighest vs current + if (highest-current < total*triggerPercentMajority + triggerDeltaAbovePercentMajority) return false; + if (highest-current < triggerAbsoluteMajority) return false; + return true; + } + + public String toString() { + return "Inter-geography policy params: percentTotal="+triggerPercentTotal+"; deltaAbovePercentTotal="+triggerDeltaAbovePercentTotal+ + "; absoluteTotal="+triggerAbsoluteTotal+"; percentMajority="+triggerPercentMajority+ + "; deltaAbovePercentMajority="+triggerDeltaAbovePercentMajority+"; absoluteMajority="+triggerAbsoluteMajority; + + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicy.java new file mode 100644 index 0000000..a02285e --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicy.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.followthesun; + +import static brooklyn.util.JavaGroovyEquivalents.elvis; +import static brooklyn.util.JavaGroovyEquivalents.groovyTruth; +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.apache.brooklyn.api.event.AttributeSensor; +import org.apache.brooklyn.api.event.Sensor; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.location.MachineProvisioningLocation; +import org.apache.brooklyn.core.policy.basic.AbstractPolicy; +import org.apache.brooklyn.core.util.flags.SetFromFlag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.Attributes; + +import org.apache.brooklyn.policy.followthesun.FollowTheSunPool.ContainerItemPair; +import org.apache.brooklyn.policy.loadbalancing.Movable; + +import brooklyn.util.collections.MutableMap; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + + // removed from catalog because it cannot currently be configured via catalog mechanisms - + // PolicySpec.create fails due to no no-arg constructor + // TODO make model and parameters things which can be initialized from config then reinstate in catalog +//@Catalog(name="Follow the Sun", description="Policy for moving \"work\" around to follow the demand; " +// + "the work can be any \"Movable\" entity") +public class FollowTheSunPolicy extends AbstractPolicy { + + private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunPolicy.class); + + public static final String NAME = "Follow the Sun (Inter-Geography Latency Optimization)"; + + @SetFromFlag(defaultVal="100") + private long minPeriodBetweenExecs; + + @SetFromFlag + private Function<Entity, Location> locationFinder; + + private final AttributeSensor<Map<? extends Movable, Double>> itemUsageMetric; + private final FollowTheSunModel<Entity, Movable> model; + private final FollowTheSunStrategy<Entity, Movable> strategy; + private final FollowTheSunParameters parameters; + + private FollowTheSunPool poolEntity; + + private volatile ScheduledExecutorService executor; + private final AtomicBoolean executorQueued = new AtomicBoolean(false); + private volatile long executorTime = 0; + private boolean loggedConstraintsIgnored = false; + + private final Function<Entity, Location> defaultLocationFinder = new Function<Entity, Location>() { + public Location apply(Entity e) { + Collection<Location> locs = e.getLocations(); + if (locs.isEmpty()) return null; + Location contender = Iterables.get(locs, 0); + while (contender.getParent() != null && !(contender instanceof MachineProvisioningLocation)) { + contender = contender.getParent(); + } + return contender; + } + }; + + private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() { + @Override + public void onEvent(SensorEvent<Object> event) { + if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", FollowTheSunPolicy.this, event); + Entity source = event.getSource(); + Object value = event.getValue(); + Sensor<?> sensor = event.getSensor(); + + if (sensor.equals(itemUsageMetric)) { + onItemMetricUpdated((Movable)source, (Map<? extends Movable, Double>) value, true); + } else if (sensor.equals(Attributes.LOCATION_CHANGED)) { + onContainerLocationUpdated(source, true); + } else if (sensor.equals(FollowTheSunPool.CONTAINER_ADDED)) { + onContainerAdded((Entity) value, true); + } else if (sensor.equals(FollowTheSunPool.CONTAINER_REMOVED)) { + onContainerRemoved((Entity) value, true); + } else if (sensor.equals(FollowTheSunPool.ITEM_ADDED)) { + onItemAdded((Movable) value, true); + } else if (sensor.equals(FollowTheSunPool.ITEM_REMOVED)) { + onItemRemoved((Movable) value, true); + } else if (sensor.equals(FollowTheSunPool.ITEM_MOVED)) { + ContainerItemPair pair = (ContainerItemPair) value; + onItemMoved((Movable)pair.item, pair.container, true); + } + } + }; + + // FIXME parameters: use a more groovy way of doing it, that's consistent with other policies/entities? + public FollowTheSunPolicy(AttributeSensor itemUsageMetric, + FollowTheSunModel<Entity, Movable> model, FollowTheSunParameters parameters) { + this(MutableMap.of(), itemUsageMetric, model, parameters); + } + + public FollowTheSunPolicy(Map props, AttributeSensor itemUsageMetric, + FollowTheSunModel<Entity, Movable> model, FollowTheSunParameters parameters) { + super(props); + this.itemUsageMetric = itemUsageMetric; + this.model = model; + this.parameters = parameters; + this.strategy = new FollowTheSunStrategy<Entity, Movable>(model, parameters); // TODO: extract interface, inject impl + this.locationFinder = elvis(locationFinder, defaultLocationFinder); + + // TODO Should re-use the execution manager's thread pool, somehow + executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory()); + } + + @Override + public void setEntity(EntityLocal entity) { + checkArgument(entity instanceof FollowTheSunPool, "Provided entity must be a FollowTheSunPool"); + super.setEntity(entity); + this.poolEntity = (FollowTheSunPool) entity; + + // Detect when containers are added to or removed from the pool. + subscribe(poolEntity, FollowTheSunPool.CONTAINER_ADDED, eventHandler); + subscribe(poolEntity, FollowTheSunPool.CONTAINER_REMOVED, eventHandler); + subscribe(poolEntity, FollowTheSunPool.ITEM_ADDED, eventHandler); + subscribe(poolEntity, FollowTheSunPool.ITEM_REMOVED, eventHandler); + subscribe(poolEntity, FollowTheSunPool.ITEM_MOVED, eventHandler); + + // Take heed of any extant containers. + for (Entity container : poolEntity.getContainerGroup().getMembers()) { + onContainerAdded(container, false); + } + for (Entity item : poolEntity.getItemGroup().getMembers()) { + onItemAdded((Movable)item, false); + } + + scheduleLatencyReductionJig(); + } + + @Override + public void suspend() { + // TODO unsubscribe from everything? And resubscribe on resume? + super.suspend(); + if (executor != null) executor.shutdownNow(); + executorQueued.set(false); + } + + @Override + public void resume() { + super.resume(); + executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory()); + executorTime = 0; + executorQueued.set(false); + } + + private ThreadFactory newThreadFactory() { + return new ThreadFactoryBuilder() + .setNameFormat("brooklyn-followthesunpolicy-%d") + .build(); + } + + private void scheduleLatencyReductionJig() { + if (isRunning() && executorQueued.compareAndSet(false, true)) { + long now = System.currentTimeMillis(); + long delay = Math.max(0, (executorTime + minPeriodBetweenExecs) - now); + + executor.schedule(new Runnable() { + public void run() { + try { + executorTime = System.currentTimeMillis(); + executorQueued.set(false); + + if (LOG.isTraceEnabled()) LOG.trace("{} executing follow-the-sun migration-strategy", this); + strategy.rebalance(); + + } catch (RuntimeException e) { + if (isRunning()) { + LOG.error("Error during latency-reduction-jig", e); + } else { + LOG.debug("Error during latency-reduction-jig, but no longer running", e); + } + } + }}, + delay, + TimeUnit.MILLISECONDS); + } + } + + private void onContainerAdded(Entity container, boolean rebalanceNow) { + subscribe(container, Attributes.LOCATION_CHANGED, eventHandler); + Location location = locationFinder.apply(container); + + if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of container {} in location {}", new Object[] {this, container, location}); + model.onContainerAdded(container, location); + + if (rebalanceNow) scheduleLatencyReductionJig(); + } + + private void onContainerRemoved(Entity container, boolean rebalanceNow) { + if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of container {}", this, container); + model.onContainerRemoved(container); + if (rebalanceNow) scheduleLatencyReductionJig(); + } + + private void onItemAdded(Movable item, boolean rebalanceNow) { + Entity parentContainer = (Entity) item.getAttribute(Movable.CONTAINER); + + if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of item {} in container {}", new Object[] {this, item, parentContainer}); + + subscribe(item, itemUsageMetric, eventHandler); + + // Update the model, including the current metric value (if any). + Map<? extends Movable, Double> currentValue = item.getAttribute(itemUsageMetric); + boolean immovable = (Boolean)elvis(item.getConfig(Movable.IMMOVABLE), false); + model.onItemAdded(item, parentContainer, immovable); + + if (currentValue != null) { + model.onItemUsageUpdated(item, currentValue); + } + + if (rebalanceNow) scheduleLatencyReductionJig(); + } + + private void onItemRemoved(Movable item, boolean rebalanceNow) { + if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of item {}", this, item); + unsubscribe(item); + model.onItemRemoved(item); + if (rebalanceNow) scheduleLatencyReductionJig(); + } + + private void onItemMoved(Movable item, Entity parentContainer, boolean rebalanceNow) { + if (LOG.isTraceEnabled()) LOG.trace("{} recording moving of item {} to {}", new Object[] {this, item, parentContainer}); + model.onItemMoved(item, parentContainer); + if (rebalanceNow) scheduleLatencyReductionJig(); + } + + private void onContainerLocationUpdated(Entity container, boolean rebalanceNow) { + Location location = locationFinder.apply(container); + if (LOG.isTraceEnabled()) LOG.trace("{} recording location for container {}, new value {}", new Object[] {this, container, location}); + model.onContainerLocationUpdated(container, location); + if (rebalanceNow) scheduleLatencyReductionJig(); + } + + private void onItemMetricUpdated(Movable item, Map<? extends Movable, Double> newValues, boolean rebalanceNow) { + if (LOG.isTraceEnabled()) LOG.trace("{} recording usage update for item {}, new value {}", new Object[] {this, item, newValues}); + model.onItemUsageUpdated(item, newValues); + if (rebalanceNow) scheduleLatencyReductionJig(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + (groovyTruth(name) ? "("+name+")" : ""); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPool.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPool.java b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPool.java new file mode 100644 index 0000000..7dc668e --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPool.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.followthesun; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.Serializable; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; + +import brooklyn.entity.trait.Resizable; +import brooklyn.event.basic.BasicNotificationSensor; + +@ImplementedBy(FollowTheSunPoolImpl.class) +public interface FollowTheSunPool extends Entity, Resizable { + + // FIXME Remove duplication from BalanceableWorkerPool? + + // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and ITEM_REMOVED in loadbalancing + // are of type ContainerItemPair, but in followTheSun it is just the `Entity item`. + + /** Encapsulates an item and a container; emitted by sensors. + */ + public static class ContainerItemPair implements Serializable { + private static final long serialVersionUID = 1L; + public final Entity container; + public final Entity item; + + public ContainerItemPair(Entity container, Entity item) { + this.container = container; + this.item = checkNotNull(item); + } + + @Override + public String toString() { + return ""+item+" @ "+container; + } + } + + // Pool constituent notifications. + public static BasicNotificationSensor<Entity> CONTAINER_ADDED = new BasicNotificationSensor<Entity>( + Entity.class, "followthesun.container.added", "Container added"); + public static BasicNotificationSensor<Entity> CONTAINER_REMOVED = new BasicNotificationSensor<Entity>( + Entity.class, "followthesun.container.removed", "Container removed"); + public static BasicNotificationSensor<Entity> ITEM_ADDED = new BasicNotificationSensor<Entity>( + Entity.class, "followthesun.item.added", "Item added"); + public static BasicNotificationSensor<Entity> ITEM_REMOVED = new BasicNotificationSensor<Entity>( + Entity.class, "followthesun.item.removed", "Item removed"); + public static BasicNotificationSensor<ContainerItemPair> ITEM_MOVED = new BasicNotificationSensor<ContainerItemPair>( + ContainerItemPair.class, "followthesun.item.moved", "Item moved to the given container"); + + public void setContents(Group containerGroup, Group itemGroup); + + public Group getContainerGroup(); + + public Group getItemGroup(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java new file mode 100644 index 0000000..b9c597e --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.followthesun; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.event.Sensor; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.AbstractEntity; +import brooklyn.entity.basic.AbstractGroup; +import brooklyn.entity.trait.Resizable; +import brooklyn.entity.trait.Startable; +import org.apache.brooklyn.policy.loadbalancing.Movable; + +public class FollowTheSunPoolImpl extends AbstractEntity implements FollowTheSunPool { + + // FIXME Remove duplication from BalanceableWorkerPool? + + // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and ITEM_REMOVED in loadbalancing + // are of type ContainerItemPair, but in followTheSun it is just the `Entity item`. + + private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunPool.class); + + private Group containerGroup; + private Group itemGroup; + + private final Set<Entity> containers = Collections.synchronizedSet(new HashSet<Entity>()); + private final Set<Entity> items = Collections.synchronizedSet(new HashSet<Entity>()); + + private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() { + @Override + public void onEvent(SensorEvent<Object> event) { + if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", FollowTheSunPoolImpl.this, event); + Entity source = event.getSource(); + Object value = event.getValue(); + Sensor sensor = event.getSensor(); + + if (sensor.equals(AbstractGroup.MEMBER_ADDED)) { + if (source.equals(containerGroup)) { + onContainerAdded((Entity) value); + } else if (source.equals(itemGroup)) { + onItemAdded((Entity)value); + } else { + throw new IllegalStateException("unexpected event source="+source); + } + } else if (sensor.equals(AbstractGroup.MEMBER_REMOVED)) { + if (source.equals(containerGroup)) { + onContainerRemoved((Entity) value); + } else if (source.equals(itemGroup)) { + onItemRemoved((Entity) value); + } else { + throw new IllegalStateException("unexpected event source="+source); + } + } else if (sensor.equals(Startable.SERVICE_UP)) { + // TODO What if start has failed? Is there a sensor to indicate that? + if ((Boolean)value) { + onContainerUp(source); + } else { + onContainerDown(source); + } + } else if (sensor.equals(Movable.CONTAINER)) { + onItemMoved(source, (Entity) value); + } else { + throw new IllegalStateException("Unhandled event type "+sensor+": "+event); + } + } + }; + + public FollowTheSunPoolImpl() { + } + + @Override + public void setContents(Group containerGroup, Group itemGroup) { + this.containerGroup = containerGroup; + this.itemGroup = itemGroup; + subscribe(containerGroup, AbstractGroup.MEMBER_ADDED, eventHandler); + subscribe(containerGroup, AbstractGroup.MEMBER_REMOVED, eventHandler); + subscribe(itemGroup, AbstractGroup.MEMBER_ADDED, eventHandler); + subscribe(itemGroup, AbstractGroup.MEMBER_REMOVED, eventHandler); + + // Process extant containers and items + for (Entity existingContainer : containerGroup.getMembers()) { + onContainerAdded(existingContainer); + } + for (Entity existingItem : itemGroup.getMembers()) { + onItemAdded((Entity)existingItem); + } + } + + @Override + public Group getContainerGroup() { + return containerGroup; + } + + @Override + public Group getItemGroup() { + return itemGroup; + } + + @Override + public Integer getCurrentSize() { + return containerGroup.getCurrentSize(); + } + + @Override + public Integer resize(Integer desiredSize) { + if (containerGroup instanceof Resizable) return ((Resizable) containerGroup).resize(desiredSize); + + throw new UnsupportedOperationException("Container group is not resizable"); + } + + + private void onContainerAdded(Entity newContainer) { + subscribe(newContainer, Startable.SERVICE_UP, eventHandler); + if (!(newContainer instanceof Startable) || Boolean.TRUE.equals(newContainer.getAttribute(Startable.SERVICE_UP))) { + onContainerUp(newContainer); + } + } + + private void onContainerUp(Entity newContainer) { + if (containers.add(newContainer)) { + emit(CONTAINER_ADDED, newContainer); + } + } + + private void onContainerDown(Entity oldContainer) { + if (containers.remove(oldContainer)) { + emit(CONTAINER_REMOVED, oldContainer); + } + } + + private void onContainerRemoved(Entity oldContainer) { + unsubscribe(oldContainer); + onContainerDown(oldContainer); + } + + private void onItemAdded(Entity item) { + if (items.add(item)) { + subscribe(item, Movable.CONTAINER, eventHandler); + emit(ITEM_ADDED, item); + } + } + + private void onItemRemoved(Entity item) { + if (items.remove(item)) { + unsubscribe(item); + emit(ITEM_REMOVED, item); + } + } + + private void onItemMoved(Entity item, Entity container) { + emit(ITEM_MOVED, new ContainerItemPair(container, item)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunStrategy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunStrategy.java b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunStrategy.java new file mode 100644 index 0000000..68d6ae2 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunStrategy.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.followthesun; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.location.Location; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.brooklyn.policy.loadbalancing.Movable; + +import com.google.common.collect.Iterables; + +// TODO: extract interface +public class FollowTheSunStrategy<ContainerType extends Entity, ItemType extends Movable> { + + // This is a modified version of the InterGeographyLatencyPolicy (aka Follow-The-Sun) policy from Monterey v3. + + // TODO location constraints + + private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunStrategy.class); + + private final FollowTheSunParameters parameters; + private final FollowTheSunModel<ContainerType,ItemType> model; + private final String name; + + public FollowTheSunStrategy(FollowTheSunModel<ContainerType,ItemType> model, FollowTheSunParameters parameters) { + this.model = model; + this.parameters = parameters; + this.name = model.getName(); + } + + public void rebalance() { + try { + Set<ItemType> items = model.getItems(); + Map<ItemType, Map<Location, Double>> directSendsToItemByLocation = model.getDirectSendsToItemByLocation(); + + for (ItemType item : items) { + String itemName = model.getName(item); + Location activeLocation = model.getItemLocation(item); + ContainerType activeContainer = model.getItemContainer(item); + Map<Location, Double> sendsByLocation = directSendsToItemByLocation.get(item); + if (sendsByLocation == null) sendsByLocation = Collections.emptyMap(); + + if (parameters.excludedLocations.contains(activeLocation)) { + if (LOG.isTraceEnabled()) LOG.trace("Ignoring segment {} as it is in {}", itemName, activeLocation); + continue; + } + if (!model.isItemMoveable(item)) { + if (LOG.isDebugEnabled()) LOG.debug("POLICY {} skipping any migration of {}, it is not moveable", name, itemName); + continue; + } + if (model.hasActiveMigration(item)) { + LOG.info("POLICY {} skipping any migration of {}, it is involved in an active migration already", name, itemName); + continue; + } + + double total = DefaultFollowTheSunModel.sum(sendsByLocation.values()); + + if (LOG.isTraceEnabled()) LOG.trace("POLICY {} detected {} msgs/sec in {}, split up as: {}", new Object[] {name, total, itemName, sendsByLocation}); + + Double current = sendsByLocation.get(activeLocation); + if (current == null) current=0d; + List<WeightedObject<Location>> locationsWtd = new ArrayList<WeightedObject<Location>>(); + if (total > 0) { + for (Map.Entry<Location, Double> entry : sendsByLocation.entrySet()) { + Location l = entry.getKey(); + Double d = entry.getValue(); + if (d > current) locationsWtd.add(new WeightedObject<Location>(l, d)); + } + } + Collections.sort(locationsWtd); + Collections.reverse(locationsWtd); + + double highestMsgRate = -1; + Location highestLocation = null; + ContainerType optimalContainerInHighest = null; + while (!locationsWtd.isEmpty()) { + WeightedObject<Location> weightedObject = locationsWtd.remove(0); + highestMsgRate = weightedObject.getWeight(); + highestLocation = weightedObject.getObject(); + optimalContainerInHighest = findOptimal(model.getAvailableContainersFor(item, highestLocation)); + if (optimalContainerInHighest != null) { + break; + } + } + if (optimalContainerInHighest == null) { + if (LOG.isDebugEnabled()) LOG.debug("POLICY {} detected {} is already in optimal permitted location ({} of {} msgs/sec)", new Object[] {name, itemName, highestMsgRate, total}); + continue; + } + + double nextHighestMsgRate = -1; + ContainerType optimalContainerInNextHighest = null; + while (!locationsWtd.isEmpty()) { + WeightedObject<Location> weightedObject = locationsWtd.remove(0); + nextHighestMsgRate = weightedObject.getWeight(); + Location nextHighestLocation = weightedObject.getObject(); + optimalContainerInNextHighest = findOptimal(model.getAvailableContainersFor(item, nextHighestLocation)); + if (optimalContainerInNextHighest != null) { + break; + } + } + if (optimalContainerInNextHighest == null) { + nextHighestMsgRate = current; + } + + if (parameters.isTriggered(highestMsgRate, total, nextHighestMsgRate, current)) { + LOG.info("POLICY "+name+" detected "+itemName+" should be in location "+highestLocation+" on "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" msgs/sec), migrating"); + try { + if (activeContainer.equals(optimalContainerInHighest)) { + //shouldn't happen + LOG.warn("POLICY "+name+" detected "+itemName+" should move to "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" msgs/sec) but it is already there with "+current+" msgs/sec"); + } else { + item.move(optimalContainerInHighest); + model.onItemMoved(item, optimalContainerInHighest); + } + } catch (Exception e) { + LOG.warn("POLICY "+name+" detected "+itemName+" should be on "+optimalContainerInHighest+", but can't move it: "+e, e); + } + } else { + if (LOG.isTraceEnabled()) LOG.trace("POLICY "+name+" detected "+itemName+" need not move to "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" msgs/sec not much better than "+current+" at "+activeContainer+")"); + } + } + } catch (Exception e) { + LOG.warn("Error in policy "+name+" (ignoring): "+e, e); + } + } + + private ContainerType findOptimal(Collection<ContainerType> contenders) { + /* + * TODO should choose the least loaded mediator. Currently chooses first available, and relies + * on a load-balancer to move it again; would be good if these could share decision code so move + * it to the right place immediately. e.g. + * policyUtil.findLeastLoadedMediator(nodesInLocation); + */ + return (contenders.isEmpty() ? null : Iterables.get(contenders, 0)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/WeightedObject.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/WeightedObject.java b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/WeightedObject.java new file mode 100644 index 0000000..e1caf1f --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/WeightedObject.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.followthesun; + +public class WeightedObject<T> implements Comparable<WeightedObject<T>>{ + + final T object; + final double weight; + + public WeightedObject(T obj, double weight) { + this.object = obj; + this.weight = weight; + } + + public T getObject() { + return object; + } + + public double getWeight() { + return weight; + } + + /** + * Note that equals and compareTo are not consistent: x.compareTo(y)==0 iff x.equals(y) is + * highly recommended in Java, but is not required. This can make TreeSet etc behave poorly... + */ + public int compareTo(WeightedObject<T> o) { + double diff = o.getWeight() - weight; + if (diff>0.0000000000000001) return -1; + if (diff<-0.0000000000000001) return 1; + return 0; + } + + @Override + /** true irrespective of weight */ + public boolean equals(Object obj) { + if (!(obj instanceof WeightedObject<?>)) return false; + if (getObject()==null) { + return ((WeightedObject<?>)obj).getObject() == null; + } else { + return getObject().equals( ((WeightedObject<?>)obj).getObject() ); + } + } + + @Override + public int hashCode() { + if (getObject()==null) return 234519078; + return getObject().hashCode(); + } + + @Override + public String toString() { + return ""+getObject()+"["+getWeight()+"]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java new file mode 100644 index 0000000..d7af68a --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.ha; + +import static brooklyn.util.time.Time.makeTimeStringRounded; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.apache.brooklyn.api.event.Sensor; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.core.policy.basic.AbstractPolicy; +import org.apache.brooklyn.core.util.flags.SetFromFlag; +import org.apache.brooklyn.core.util.task.BasicTask; +import org.apache.brooklyn.core.util.task.ScheduledTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.EntityInternal; + +import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor; + +import brooklyn.util.collections.MutableMap; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.reflect.TypeToken; + +public abstract class AbstractFailureDetector extends AbstractPolicy { + + // TODO Remove duplication from ServiceFailureDetector, particularly for the stabilisation delays. + + private static final Logger LOG = LoggerFactory.getLogger(AbstractFailureDetector.class); + + private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100; + + public static final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newDurationConfigKey( + "failureDetector.pollPeriod", "", Duration.ONE_SECOND); + + @SetFromFlag("failedStabilizationDelay") + public static final ConfigKey<Duration> FAILED_STABILIZATION_DELAY = ConfigKeys.newDurationConfigKey( + "failureDetector.serviceFailedStabilizationDelay", + "Time period for which the health check consistently fails " + + "(e.g. doesn't report failed-ok-faled) before concluding failure.", + Duration.ZERO); + + @SetFromFlag("recoveredStabilizationDelay") + public static final ConfigKey<Duration> RECOVERED_STABILIZATION_DELAY = ConfigKeys.newDurationConfigKey( + "failureDetector.serviceRecoveredStabilizationDelay", + "Time period for which the health check succeeds continiually " + + "(e.g. doesn't report ok-failed-ok) before concluding recovered", + Duration.ZERO); + + @SuppressWarnings("serial") + public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_FAILED = ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {}, + "failureDetector.sensor.fail", "A sensor which will indicate failure when set", HASensors.ENTITY_FAILED); + + @SuppressWarnings("serial") + public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_RECOVERED = ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {}, + "failureDetector.sensor.recover", "A sensor which will indicate recovery from failure when set", HASensors.ENTITY_RECOVERED); + + public interface CalculatedStatus { + boolean isHealthy(); + String getDescription(); + } + + private final class PublishJob implements Runnable { + @Override public void run() { + try { + executorTime = System.currentTimeMillis(); + executorQueued.set(false); + + publishNow(); + + } catch (Exception e) { + if (isRunning()) { + LOG.error("Problem resizing: "+e, e); + } else { + if (LOG.isDebugEnabled()) LOG.debug("Problem resizing, but no longer running: "+e, e); + } + } catch (Throwable t) { + LOG.error("Problem in service-failure-detector: "+t, t); + throw Exceptions.propagate(t); + } + } + } + + private final class HealthPoller implements Runnable { + @Override + public void run() { + checkHealth(); + } + } + + private final class HealthPollingTaskFactory implements Callable<Task<?>> { + @Override + public Task<?> call() { + BasicTask<Void> task = new BasicTask<Void>(new HealthPoller()); + BrooklynTaskTags.setTransient(task); + return task; + } + } + + protected static class BasicCalculatedStatus implements CalculatedStatus { + private boolean healthy; + private String description; + + public BasicCalculatedStatus(boolean healthy, String description) { + this.healthy = healthy; + this.description = description; + } + + @Override + public boolean isHealthy() { + return healthy; + } + + @Override + public String getDescription() { + return description; + } + } + + public enum LastPublished { + NONE, + FAILED, + RECOVERED; + } + + protected final AtomicReference<Long> stateLastGood = new AtomicReference<Long>(); + protected final AtomicReference<Long> stateLastFail = new AtomicReference<Long>(); + + protected Long currentFailureStartTime = null; + protected Long currentRecoveryStartTime = null; + + protected LastPublished lastPublished = LastPublished.NONE; + + private final AtomicBoolean executorQueued = new AtomicBoolean(false); + private volatile long executorTime = 0; + + private Callable<Task<?>> pollingTaskFactory = new HealthPollingTaskFactory(); + + private Task<?> scheduledTask; + + protected abstract CalculatedStatus calculateStatus(); + + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + + if (isRunning()) { + doStartPolling(); + } + } + + @Override + public void suspend() { + scheduledTask.cancel(true); + super.suspend(); + } + + @Override + public void resume() { + currentFailureStartTime = null; + currentRecoveryStartTime = null; + lastPublished = LastPublished.NONE; + executorQueued.set(false); + executorTime = 0; + + super.resume(); + doStartPolling(); + } + + @SuppressWarnings("unchecked") + protected void doStartPolling() { + if (scheduledTask == null || scheduledTask.isDone()) { + ScheduledTask task = new ScheduledTask(MutableMap.of("period", getPollPeriod(), "displayName", getTaskName()), pollingTaskFactory); + scheduledTask = ((EntityInternal)entity).getExecutionContext().submit(task); + } + } + + private String getTaskName() { + return getDisplayName(); + } + + protected Duration getPollPeriod() { + return getConfig(POLL_PERIOD); + } + + protected Duration getFailedStabilizationDelay() { + return getConfig(FAILED_STABILIZATION_DELAY); + } + + protected Duration getRecoveredStabilizationDelay() { + return getConfig(RECOVERED_STABILIZATION_DELAY); + } + + protected Sensor<FailureDescriptor> getSensorFailed() { + return getConfig(SENSOR_FAILED); + } + + protected Sensor<FailureDescriptor> getSensorRecovered() { + return getConfig(SENSOR_RECOVERED); + } + + private synchronized void checkHealth() { + CalculatedStatus status = calculateStatus(); + boolean healthy = status.isHealthy(); + long now = System.currentTimeMillis(); + + if (healthy) { + stateLastGood.set(now); + if (lastPublished == LastPublished.FAILED) { + if (currentRecoveryStartTime == null) { + LOG.info("{} check for {}, now recovering: {}", new Object[] {this, entity, getDescription(status)}); + currentRecoveryStartTime = now; + schedulePublish(); + } else { + if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, continuing recovering: {}", new Object[] {this, entity, getDescription(status)}); + } + } else { + if (currentFailureStartTime != null) { + LOG.info("{} check for {}, now healthy: {}", new Object[] {this, entity, getDescription(status)}); + currentFailureStartTime = null; + } else { + if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, still healthy: {}", new Object[] {this, entity, getDescription(status)}); + } + } + } else { + stateLastFail.set(now); + if (lastPublished != LastPublished.FAILED) { + if (currentFailureStartTime == null) { + LOG.info("{} check for {}, now failing: {}", new Object[] {this, entity, getDescription(status)}); + currentFailureStartTime = now; + schedulePublish(); + } else { + if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, continuing failing: {}", new Object[] {this, entity, getDescription(status)}); + } + } else { + if (currentRecoveryStartTime != null) { + LOG.info("{} check for {}, now failing: {}", new Object[] {this, entity, getDescription(status)}); + currentRecoveryStartTime = null; + } else { + if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, still failed: {}", new Object[] {this, entity, getDescription(status)}); + } + } + } + } + + protected void schedulePublish() { + schedulePublish(0); + } + + @SuppressWarnings("unchecked") + protected void schedulePublish(long delay) { + if (isRunning() && executorQueued.compareAndSet(false, true)) { + long now = System.currentTimeMillis(); + delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now)); + if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay); + + Runnable job = new PublishJob(); + + ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask<Void>(job)); + ((EntityInternal)entity).getExecutionContext().submit(task); + } + } + + private synchronized void publishNow() { + if (!isRunning()) return; + + CalculatedStatus calculatedStatus = calculateStatus(); + boolean healthy = calculatedStatus.isHealthy(); + + Long lastUpTime = stateLastGood.get(); + Long lastDownTime = stateLastFail.get(); + long serviceFailedStabilizationDelay = getFailedStabilizationDelay().toMilliseconds(); + long serviceRecoveredStabilizationDelay = getRecoveredStabilizationDelay().toMilliseconds(); + long now = System.currentTimeMillis(); + + if (healthy) { + if (lastPublished == LastPublished.FAILED) { + // only publish if consistently up for serviceRecoveredStabilizationDelay + long currentRecoveryPeriod = getTimeDiff(now, currentRecoveryStartTime); + long sinceLastDownPeriod = getTimeDiff(now, lastDownTime); + if (currentRecoveryPeriod > serviceRecoveredStabilizationDelay && sinceLastDownPeriod > serviceRecoveredStabilizationDelay) { + String description = getDescription(calculatedStatus); + LOG.warn("{} check for {}, publishing recovered: {}", new Object[] {this, entity, description}); + entity.emit(getSensorRecovered(), new HASensors.FailureDescriptor(entity, description)); + lastPublished = LastPublished.RECOVERED; + currentFailureStartTime = null; + } else { + long nextAttemptTime = Math.max(serviceRecoveredStabilizationDelay - currentRecoveryPeriod, serviceRecoveredStabilizationDelay - sinceLastDownPeriod); + schedulePublish(nextAttemptTime); + } + } + } else { + if (lastPublished != LastPublished.FAILED) { + // only publish if consistently down for serviceFailedStabilizationDelay + long currentFailurePeriod = getTimeDiff(now, currentFailureStartTime); + long sinceLastUpPeriod = getTimeDiff(now, lastUpTime); + if (currentFailurePeriod > serviceFailedStabilizationDelay && sinceLastUpPeriod > serviceFailedStabilizationDelay) { + String description = getDescription(calculatedStatus); + LOG.warn("{} connectivity-check for {}, publishing failed: {}", new Object[] {this, entity, description}); + entity.emit(getSensorFailed(), new HASensors.FailureDescriptor(entity, description)); + lastPublished = LastPublished.FAILED; + currentRecoveryStartTime = null; + } else { + long nextAttemptTime = Math.max(serviceFailedStabilizationDelay - currentFailurePeriod, serviceFailedStabilizationDelay - sinceLastUpPeriod); + schedulePublish(nextAttemptTime); + } + } + } + } + + protected String getDescription(CalculatedStatus status) { + Long lastUpTime = stateLastGood.get(); + Long lastDownTime = stateLastGood.get(); + Duration serviceFailedStabilizationDelay = getFailedStabilizationDelay(); + Duration serviceRecoveredStabilizationDelay = getRecoveredStabilizationDelay(); + + return String.format("%s; healthy=%s; timeNow=%s; lastUp=%s; lastDown=%s; lastPublished=%s; "+ + "currentFailurePeriod=%s; currentRecoveryPeriod=%s", + status.getDescription(), + status.isHealthy(), + Time.makeDateString(System.currentTimeMillis()), + (lastUpTime != null ? Time.makeDateString(lastUpTime) : "<never>"), + (lastDownTime != null ? Time.makeDateString(lastDownTime) : "<never>"), + lastPublished, + (currentFailureStartTime != null ? getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceFailedStabilizationDelay) + ")", + (currentRecoveryStartTime != null ? getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")"); + } + + private long getTimeDiff(Long recent, Long previous) { + return (previous == null) ? recent : (recent - previous); + } + + private String getTimeStringSince(Long time) { + return time == null ? null : Time.makeTimeStringRounded(System.currentTimeMillis() - time); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/ha/ConditionalSuspendPolicy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ConditionalSuspendPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ConditionalSuspendPolicy.java new file mode 100644 index 0000000..4bc1dc1 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ConditionalSuspendPolicy.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.ha; + +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.apache.brooklyn.api.event.Sensor; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.apache.brooklyn.api.policy.Policy; +import org.apache.brooklyn.core.policy.basic.AbstractPolicy; +import org.apache.brooklyn.core.util.flags.SetFromFlag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.util.javalang.JavaClassNames; + +import com.google.common.base.Preconditions; + +public class ConditionalSuspendPolicy extends AbstractPolicy { + private static final Logger LOG = LoggerFactory.getLogger(ConditionalSuspendPolicy.class); + + @SetFromFlag("suppressSensor") + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static final ConfigKey<Sensor<?>> SUSPEND_SENSOR = (ConfigKey) ConfigKeys.newConfigKey(Sensor.class, + "suppressSensor", "Sensor which will suppress the target policy", HASensors.CONNECTION_FAILED); + + @SetFromFlag("resetSensor") + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static final ConfigKey<Sensor<?>> RESUME_SENSOR = (ConfigKey) ConfigKeys.newConfigKey(Sensor.class, + "resetSensor", "Resume target policy when this sensor is observed", HASensors.CONNECTION_RECOVERED); + + @SetFromFlag("target") + public static final ConfigKey<Object> SUSPEND_TARGET = ConfigKeys.newConfigKey(Object.class, + "target", "The target policy to suspend. Either direct reference or the value of the suspendTarget config on a policy from the same entity."); + + @Override + public void setEntity(EntityLocal entity) { + super.setEntity(entity); + Object target = config().get(SUSPEND_TARGET); + Preconditions.checkNotNull(target, "Suspend target required"); + Preconditions.checkNotNull(getTargetPolicy(), "Can't find target policy set in " + SUSPEND_TARGET.getName() + ": " + target); + subscribe(); + uniqueTag = JavaClassNames.simpleClassName(getClass())+":"+getConfig(SUSPEND_SENSOR).getName()+":"+getConfig(RESUME_SENSOR).getName(); + } + + private void subscribe() { + subscribe(entity, getConfig(SUSPEND_SENSOR), new SensorEventListener<Object>() { + @Override public void onEvent(final SensorEvent<Object> event) { + if (isRunning()) { + Policy target = getTargetPolicy(); + target.suspend(); + LOG.debug("Suspended policy " + target + ", triggered by " + event.getSensor() + " = " + event.getValue()); + } + } + + }); + subscribe(entity, getConfig(RESUME_SENSOR), new SensorEventListener<Object>() { + @Override public void onEvent(final SensorEvent<Object> event) { + if (isRunning()) { + Policy target = getTargetPolicy(); + target.resume(); + LOG.debug("Resumed policy " + target + ", triggered by " + event.getSensor() + " = " + event.getValue()); + } + } + }); + } + + private Policy getTargetPolicy() { + Object target = config().get(SUSPEND_TARGET); + if (target instanceof Policy) { + return (Policy)target; + } else if (target instanceof String) { + for (Policy policy : entity.getPolicies()) { + // No way to set config values for keys NOT declared in the policy, + // so must use displayName as a generally available config value. + if (target.equals(policy.getDisplayName()) || target.equals(policy.getClass().getName())) { + return policy; + } + } + } else { + throw new IllegalStateException("Unexpected type " + target.getClass() + " for target " + target); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetector.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetector.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetector.java new file mode 100644 index 0000000..2cca77f --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetector.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.ha; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.event.Sensor; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.event.basic.BasicNotificationSensor; + +import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor; + +import brooklyn.util.guava.Maybe; +import brooklyn.util.net.Networking; +import brooklyn.util.time.Duration; + +import com.google.common.net.HostAndPort; + +/** + * Monitors a given {@link HostAndPort}, to emit HASensors.CONNECTION_FAILED and HASensors.CONNECTION_RECOVERED + * if the connection is lost/restored. + */ +@Catalog(name="Connection Failure Detector", description="HA policy for monitoring a host:port, " + + "emitting an event if the connection is lost/restored") +public class ConnectionFailureDetector extends AbstractFailureDetector { + + public static final ConfigKey<HostAndPort> ENDPOINT = ConfigKeys.newConfigKey(HostAndPort.class, "connectionFailureDetector.endpoint"); + + public static final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newConfigKey(Duration.class, "connectionFailureDetector.pollPeriod", "", Duration.ONE_SECOND); + + public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = HASensors.CONNECTION_FAILED; + + public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED; + + @SetFromFlag("connectionFailedStabilizationDelay") + public static final ConfigKey<Duration> CONNECTION_FAILED_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class) + .name("connectionFailureDetector.serviceFailedStabilizationDelay") + .description("Time period for which the connection must be consistently down for " + + "(e.g. doesn't report down-up-down) before concluding failure. " + + "Note that long TCP timeouts mean there can be long (e.g. 70 second) " + + "delays in noticing a connection refused condition.") + .defaultValue(Duration.ZERO) + .build(); + + @SetFromFlag("connectionRecoveredStabilizationDelay") + public static final ConfigKey<Duration> CONNECTION_RECOVERED_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class) + .name("connectionFailureDetector.serviceRecoveredStabilizationDelay") + .description("For a failed connection, time period for which the connection must be consistently up for (e.g. doesn't report up-down-up) before concluding recovered") + .defaultValue(Duration.ZERO) + .build(); + + @Override + public void init() { + super.init(); + getRequiredConfig(ENDPOINT); // just to confirm it's set, failing fast + if (config().getRaw(SENSOR_FAILED).isAbsent()) { + config().set(SENSOR_FAILED, CONNECTION_FAILED); + } + if (config().getRaw(SENSOR_RECOVERED).isAbsent()) { + config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED); + } + } + + @Override + protected CalculatedStatus calculateStatus() { + HostAndPort endpoint = getConfig(ENDPOINT); + boolean isHealthy = Networking.isReachable(endpoint); + return new BasicCalculatedStatus(isHealthy, "endpoint=" + endpoint); + } + + //Persistence compatibility overrides + @Override + protected Duration getPollPeriod() { + return getConfig(POLL_PERIOD); + } + + @Override + protected Duration getFailedStabilizationDelay() { + return getConfig(CONNECTION_FAILED_STABILIZATION_DELAY); + } + + @Override + protected Duration getRecoveredStabilizationDelay() { + return getConfig(CONNECTION_RECOVERED_STABILIZATION_DELAY); + } + + @SuppressWarnings("unchecked") + @Override + protected Sensor<FailureDescriptor> getSensorFailed() { + Maybe<Object> sensorFailed = config().getRaw(SENSOR_FAILED); + if (sensorFailed.isPresent()) { + return (Sensor<FailureDescriptor>)sensorFailed.get(); + } else { + return CONNECTION_FAILED; + } + } + + @SuppressWarnings("unchecked") + @Override + protected Sensor<FailureDescriptor> getSensorRecovered() { + Maybe<Object> sensorRecovered = config().getRaw(SENSOR_RECOVERED); + if (sensorRecovered.isPresent()) { + return (Sensor<FailureDescriptor>)sensorRecovered.get(); + } else { + return CONNECTION_RECOVERED; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/ha/HASensors.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/HASensors.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/HASensors.java new file mode 100644 index 0000000..24763f0 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/HASensors.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.ha; + +import brooklyn.event.basic.BasicNotificationSensor; + +import com.google.common.base.Objects; + +public class HASensors { + + public static final BasicNotificationSensor<FailureDescriptor> ENTITY_FAILED = new BasicNotificationSensor<FailureDescriptor>( + FailureDescriptor.class, "ha.entityFailed", "Indicates that an entity has failed"); + + public static final BasicNotificationSensor<FailureDescriptor> ENTITY_RECOVERED = new BasicNotificationSensor<FailureDescriptor>( + FailureDescriptor.class, "ha.entityRecovered", "Indicates that a previously failed entity has recovered"); + + public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = new BasicNotificationSensor<FailureDescriptor>( + FailureDescriptor.class, "ha.connectionFailed", "Indicates that a connection has failed"); + + public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_RECOVERED = new BasicNotificationSensor<FailureDescriptor>( + FailureDescriptor.class, "ha.connectionRecovered", "Indicates that a previously failed connection has recovered"); + + // TODO How to make this serializable with the entity reference + public static class FailureDescriptor { + private final Object component; + private final String description; + + public FailureDescriptor(Object component, String description) { + this.component = component; + this.description = description; + } + + public Object getComponent() { + return component; + } + + public String getDescription() { + return description; + } + + @Override + public String toString() { + return Objects.toStringHelper(this).add("component", component).add("description", description).toString(); + } + } +}
