http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunParameters.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunParameters.java
 
b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunParameters.java
new file mode 100644
index 0000000..c8742e4
--- /dev/null
+++ 
b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunParameters.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.followthesun;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.location.Location;
+
+public class FollowTheSunParameters {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FollowTheSunParameters.class);
+
+    private FollowTheSunParameters() {}
+
+    /** trigger for moving segment X from geo A to geo B:
+     * where x is total number of requests submitted in X across the CDM 
network,
+     * and x_A is number of reqs from geo A, with A the most prolific geography
+     * (arbitrarily chosen in case of ties so recommended to choose at least a 
small percent_majority or delta_above_percent_majority, in addition to this 
field);
+     * this parameter T defines a number such that x_A > T*x in order for X to 
be migrated to A
+     * (but see also DELTA_ABOVE_PERCENT_TOTAL, below) */
+    public double triggerPercentTotal = 0.3;
+    /** fields as above, and T as above,
+     * this parameter T' defines a number such that x_A > T*x + T' in order 
for X to be migrated to A */
+    public double triggerDeltaAbovePercentTotal = 0;
+    /** fields as above,
+     * this parameter T defines a number such that x_A > T in order for X to 
be migrated to A */
+    public double triggerAbsoluteTotal = 2;
+
+    /** fields as above, with X_B the number from a different geography B,
+     * where A and B are the two most prolific requesters of X, and X_A >= X_B;
+     * this parameter T defines a number such that x_A-x_B > T*x in order for 
X to be migrated to A */
+    public double triggerPercentMajority = 0.2;
+    /** as corresponding majority and total fields, with x_A-x_B on the LHS of 
inequality */
+    public double triggerDeltaAbovePercentMajority = 1;
+    /** as corresponding majority and total fields, with x_A-x_B on the LHS of 
inequality */
+    public double triggerAbsoluteMajority = 4;
+    
+    /** a list of excluded locations */
+    public Set<Location> excludedLocations = new LinkedHashSet<Location>();
+
+    public static FollowTheSunParameters newDefault() {
+        return new FollowTheSunParameters();
+    }
+
+    private static double parseDouble(String text, double defaultValue) {
+        try {
+            double d = Double.parseDouble(text);
+            if (!Double.isNaN(d)) return d;
+        } catch (Exception e) {
+            LOG.warn("Illegal double value '"+text+"', using default 
"+defaultValue+": "+e, e);
+        }
+        return defaultValue;
+    }
+
+    private static String[] parseCommaSeparatedList(String csv) {
+        if (csv==null || csv.trim().length()==0) return new String[0];
+        return csv.split(",");
+    }
+
+    public boolean isTriggered(double highest, double total, double 
nextHighest, double current) {
+        if (highest <= current) return false;
+        if (highest < total*triggerPercentTotal + 
triggerDeltaAbovePercentTotal) return false;
+        if (highest < triggerAbsoluteTotal) return false;
+        //TODO more params about nextHighest vs current
+        if (highest-current < total*triggerPercentMajority + 
triggerDeltaAbovePercentMajority) return false;
+        if (highest-current < triggerAbsoluteMajority) return false;
+        return true;
+    }
+    
+    public String toString() {
+        return "Inter-geography policy params: 
percentTotal="+triggerPercentTotal+"; 
deltaAbovePercentTotal="+triggerDeltaAbovePercentTotal+
+                "; absoluteTotal="+triggerAbsoluteTotal+"; 
percentMajority="+triggerPercentMajority+
+                "; 
deltaAbovePercentMajority="+triggerDeltaAbovePercentMajority+"; 
absoluteMajority="+triggerAbsoluteMajority;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicy.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicy.java
 
b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicy.java
new file mode 100644
index 0000000..a02285e
--- /dev/null
+++ 
b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPolicy.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.followthesun;
+
+import static brooklyn.util.JavaGroovyEquivalents.elvis;
+import static brooklyn.util.JavaGroovyEquivalents.groovyTruth;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.apache.brooklyn.api.event.AttributeSensor;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.location.MachineProvisioningLocation;
+import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.Attributes;
+
+import 
org.apache.brooklyn.policy.followthesun.FollowTheSunPool.ContainerItemPair;
+import org.apache.brooklyn.policy.loadbalancing.Movable;
+
+import brooklyn.util.collections.MutableMap;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+    // removed from catalog because it cannot currently be configured via 
catalog mechanisms - 
+    // PolicySpec.create fails due to no no-arg constructor
+    // TODO make model and parameters things which can be initialized from 
config then reinstate in catalog
+//@Catalog(name="Follow the Sun", description="Policy for moving \"work\" 
around to follow the demand; "
+//        + "the work can be any \"Movable\" entity")
+public class FollowTheSunPolicy extends AbstractPolicy {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FollowTheSunPolicy.class);
+
+    public static final String NAME = "Follow the Sun (Inter-Geography Latency 
Optimization)";
+
+    @SetFromFlag(defaultVal="100")
+    private long minPeriodBetweenExecs;
+    
+    @SetFromFlag
+    private Function<Entity, Location> locationFinder;
+    
+    private final AttributeSensor<Map<? extends Movable, Double>> 
itemUsageMetric;
+    private final FollowTheSunModel<Entity, Movable> model;
+    private final FollowTheSunStrategy<Entity, Movable> strategy;
+    private final FollowTheSunParameters parameters;
+    
+    private FollowTheSunPool poolEntity;
+    
+    private volatile ScheduledExecutorService executor;
+    private final AtomicBoolean executorQueued = new AtomicBoolean(false);
+    private volatile long executorTime = 0;
+    private boolean loggedConstraintsIgnored = false;
+    
+    private final Function<Entity, Location> defaultLocationFinder = new 
Function<Entity, Location>() {
+        public Location apply(Entity e) {
+            Collection<Location> locs = e.getLocations();
+            if (locs.isEmpty()) return null;
+            Location contender = Iterables.get(locs, 0);
+            while (contender.getParent() != null && !(contender instanceof 
MachineProvisioningLocation)) {
+                contender = contender.getParent();
+            }
+            return contender;
+        }
+    };
+    
+    private final SensorEventListener<Object> eventHandler = new 
SensorEventListener<Object>() {
+        @Override
+        public void onEvent(SensorEvent<Object> event) {
+            if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", 
FollowTheSunPolicy.this, event);
+            Entity source = event.getSource();
+            Object value = event.getValue();
+            Sensor<?> sensor = event.getSensor();
+            
+            if (sensor.equals(itemUsageMetric)) {
+                onItemMetricUpdated((Movable)source, (Map<? extends Movable, 
Double>) value, true);
+            } else if (sensor.equals(Attributes.LOCATION_CHANGED)) {
+                onContainerLocationUpdated(source, true);
+            } else if (sensor.equals(FollowTheSunPool.CONTAINER_ADDED)) {
+                onContainerAdded((Entity) value, true);
+            } else if (sensor.equals(FollowTheSunPool.CONTAINER_REMOVED)) {
+                onContainerRemoved((Entity) value, true);
+            } else if (sensor.equals(FollowTheSunPool.ITEM_ADDED)) {
+                onItemAdded((Movable) value, true);
+            } else if (sensor.equals(FollowTheSunPool.ITEM_REMOVED)) {
+                onItemRemoved((Movable) value, true);
+            } else if (sensor.equals(FollowTheSunPool.ITEM_MOVED)) {
+                ContainerItemPair pair = (ContainerItemPair) value;
+                onItemMoved((Movable)pair.item, pair.container, true);
+            }
+        }
+    };
+    
+    // FIXME parameters: use a more groovy way of doing it, that's consistent 
with other policies/entities?
+    public FollowTheSunPolicy(AttributeSensor itemUsageMetric, 
+            FollowTheSunModel<Entity, Movable> model, FollowTheSunParameters 
parameters) {
+        this(MutableMap.of(), itemUsageMetric, model, parameters);
+    }
+    
+    public FollowTheSunPolicy(Map props, AttributeSensor itemUsageMetric, 
+            FollowTheSunModel<Entity, Movable> model, FollowTheSunParameters 
parameters) {
+        super(props);
+        this.itemUsageMetric = itemUsageMetric;
+        this.model = model;
+        this.parameters = parameters;
+        this.strategy = new FollowTheSunStrategy<Entity, Movable>(model, 
parameters); // TODO: extract interface, inject impl
+        this.locationFinder = elvis(locationFinder, defaultLocationFinder);
+        
+        // TODO Should re-use the execution manager's thread pool, somehow
+        executor = 
Executors.newSingleThreadScheduledExecutor(newThreadFactory());
+    }
+    
+    @Override
+    public void setEntity(EntityLocal entity) {
+        checkArgument(entity instanceof FollowTheSunPool, "Provided entity 
must be a FollowTheSunPool");
+        super.setEntity(entity);
+        this.poolEntity = (FollowTheSunPool) entity;
+        
+        // Detect when containers are added to or removed from the pool.
+        subscribe(poolEntity, FollowTheSunPool.CONTAINER_ADDED, eventHandler);
+        subscribe(poolEntity, FollowTheSunPool.CONTAINER_REMOVED, 
eventHandler);
+        subscribe(poolEntity, FollowTheSunPool.ITEM_ADDED, eventHandler);
+        subscribe(poolEntity, FollowTheSunPool.ITEM_REMOVED, eventHandler);
+        subscribe(poolEntity, FollowTheSunPool.ITEM_MOVED, eventHandler);
+        
+        // Take heed of any extant containers.
+        for (Entity container : poolEntity.getContainerGroup().getMembers()) {
+            onContainerAdded(container, false);
+        }
+        for (Entity item : poolEntity.getItemGroup().getMembers()) {
+            onItemAdded((Movable)item, false);
+        }
+
+        scheduleLatencyReductionJig();
+    }
+    
+    @Override
+    public void suspend() {
+        // TODO unsubscribe from everything? And resubscribe on resume?
+        super.suspend();
+        if (executor != null) executor.shutdownNow();
+        executorQueued.set(false);
+    }
+    
+    @Override
+    public void resume() {
+        super.resume();
+        executor = 
Executors.newSingleThreadScheduledExecutor(newThreadFactory());
+        executorTime = 0;
+        executorQueued.set(false);
+    }
+    
+    private ThreadFactory newThreadFactory() {
+        return new ThreadFactoryBuilder()
+                .setNameFormat("brooklyn-followthesunpolicy-%d")
+                .build();
+    }
+
+    private void scheduleLatencyReductionJig() {
+        if (isRunning() && executorQueued.compareAndSet(false, true)) {
+            long now = System.currentTimeMillis();
+            long delay = Math.max(0, (executorTime + minPeriodBetweenExecs) - 
now);
+            
+            executor.schedule(new Runnable() {
+                public void run() {
+                    try {
+                        executorTime = System.currentTimeMillis();
+                        executorQueued.set(false);
+                        
+                        if (LOG.isTraceEnabled()) LOG.trace("{} executing 
follow-the-sun migration-strategy", this);
+                        strategy.rebalance();
+                        
+                    } catch (RuntimeException e) {
+                        if (isRunning()) {
+                            LOG.error("Error during latency-reduction-jig", e);
+                        } else {
+                            LOG.debug("Error during latency-reduction-jig, but 
no longer running", e);
+                        }
+                    }
+                }},
+                delay,
+                TimeUnit.MILLISECONDS);
+        }
+    }
+    
+    private void onContainerAdded(Entity container, boolean rebalanceNow) {
+        subscribe(container, Attributes.LOCATION_CHANGED, eventHandler);
+        Location location = locationFinder.apply(container);
+        
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of 
container {} in location {}", new Object[] {this, container, location});
+        model.onContainerAdded(container, location);
+        
+        if (rebalanceNow) scheduleLatencyReductionJig();
+    }
+    
+    private void onContainerRemoved(Entity container, boolean rebalanceNow) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of container 
{}", this, container);
+        model.onContainerRemoved(container);
+        if (rebalanceNow) scheduleLatencyReductionJig();
+    }
+    
+    private void onItemAdded(Movable item, boolean rebalanceNow) {
+        Entity parentContainer = (Entity) item.getAttribute(Movable.CONTAINER);
+        
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of item {} 
in container {}", new Object[] {this, item, parentContainer});
+        
+        subscribe(item, itemUsageMetric, eventHandler);
+        
+        // Update the model, including the current metric value (if any).
+        Map<? extends Movable, Double> currentValue = 
item.getAttribute(itemUsageMetric);
+        boolean immovable = (Boolean)elvis(item.getConfig(Movable.IMMOVABLE), 
false);
+        model.onItemAdded(item, parentContainer, immovable);
+
+        if (currentValue != null) {
+            model.onItemUsageUpdated(item, currentValue);
+        }
+        
+        if (rebalanceNow) scheduleLatencyReductionJig();
+    }
+    
+    private void onItemRemoved(Movable item, boolean rebalanceNow) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of item {}", 
this, item);
+        unsubscribe(item);
+        model.onItemRemoved(item);
+        if (rebalanceNow) scheduleLatencyReductionJig();
+    }
+    
+    private void onItemMoved(Movable item, Entity parentContainer, boolean 
rebalanceNow) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording moving of item {} to 
{}", new Object[] {this, item, parentContainer});
+        model.onItemMoved(item, parentContainer);
+        if (rebalanceNow) scheduleLatencyReductionJig();
+    }
+    
+    private void onContainerLocationUpdated(Entity container, boolean 
rebalanceNow) {
+        Location location = locationFinder.apply(container);
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording location for 
container {}, new value {}", new Object[] {this, container, location});
+        model.onContainerLocationUpdated(container, location);
+        if (rebalanceNow) scheduleLatencyReductionJig();
+    }
+    
+    private void onItemMetricUpdated(Movable item, Map<? extends Movable, 
Double> newValues, boolean rebalanceNow) {
+        if (LOG.isTraceEnabled()) LOG.trace("{} recording usage update for 
item {}, new value {}", new Object[] {this, item, newValues});
+        model.onItemUsageUpdated(item, newValues);
+        if (rebalanceNow) scheduleLatencyReductionJig();
+    }
+    
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + (groovyTruth(name) ? "("+name+")" 
: "");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPool.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPool.java
 
b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPool.java
new file mode 100644
index 0000000..7dc668e
--- /dev/null
+++ 
b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPool.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.followthesun;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+
+import brooklyn.entity.trait.Resizable;
+import brooklyn.event.basic.BasicNotificationSensor;
+
+@ImplementedBy(FollowTheSunPoolImpl.class)
+public interface FollowTheSunPool extends Entity, Resizable {
+
+    // FIXME Remove duplication from BalanceableWorkerPool?
+
+    // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and 
ITEM_REMOVED in loadbalancing
+    // are of type ContainerItemPair, but in followTheSun it is just the 
`Entity item`.
+    
+    /** Encapsulates an item and a container; emitted by sensors.
+     */
+    public static class ContainerItemPair implements Serializable {
+        private static final long serialVersionUID = 1L;
+        public final Entity container;
+        public final Entity item;
+
+        public ContainerItemPair(Entity container, Entity item) {
+            this.container = container;
+            this.item = checkNotNull(item);
+        }
+
+        @Override
+        public String toString() {
+            return ""+item+" @ "+container;
+        }
+    }
+
+    // Pool constituent notifications.
+    public static BasicNotificationSensor<Entity> CONTAINER_ADDED = new 
BasicNotificationSensor<Entity>(
+            Entity.class, "followthesun.container.added", "Container added");
+    public static BasicNotificationSensor<Entity> CONTAINER_REMOVED = new 
BasicNotificationSensor<Entity>(
+            Entity.class, "followthesun.container.removed", "Container 
removed");
+    public static BasicNotificationSensor<Entity> ITEM_ADDED = new 
BasicNotificationSensor<Entity>(
+            Entity.class, "followthesun.item.added", "Item added");
+    public static BasicNotificationSensor<Entity> ITEM_REMOVED = new 
BasicNotificationSensor<Entity>(
+            Entity.class, "followthesun.item.removed", "Item removed");
+    public static BasicNotificationSensor<ContainerItemPair> ITEM_MOVED = new 
BasicNotificationSensor<ContainerItemPair>(
+            ContainerItemPair.class, "followthesun.item.moved", "Item moved to 
the given container");
+
+    public void setContents(Group containerGroup, Group itemGroup);
+
+    public Group getContainerGroup();
+
+    public Group getItemGroup();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java
 
b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java
new file mode 100644
index 0000000..b9c597e
--- /dev/null
+++ 
b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.followthesun;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.AbstractEntity;
+import brooklyn.entity.basic.AbstractGroup;
+import brooklyn.entity.trait.Resizable;
+import brooklyn.entity.trait.Startable;
+import org.apache.brooklyn.policy.loadbalancing.Movable;
+
+public class FollowTheSunPoolImpl extends AbstractEntity implements 
FollowTheSunPool {
+
+    // FIXME Remove duplication from BalanceableWorkerPool?
+
+    // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and 
ITEM_REMOVED in loadbalancing
+    // are of type ContainerItemPair, but in followTheSun it is just the 
`Entity item`.
+    
+    private static final Logger LOG = 
LoggerFactory.getLogger(FollowTheSunPool.class);
+
+    private Group containerGroup;
+    private Group itemGroup;
+
+    private final Set<Entity> containers = Collections.synchronizedSet(new 
HashSet<Entity>());
+    private final Set<Entity> items = Collections.synchronizedSet(new 
HashSet<Entity>());
+
+    private final SensorEventListener<Object> eventHandler = new 
SensorEventListener<Object>() {
+        @Override
+        public void onEvent(SensorEvent<Object> event) {
+            if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", 
FollowTheSunPoolImpl.this, event);
+            Entity source = event.getSource();
+            Object value = event.getValue();
+            Sensor sensor = event.getSensor();
+
+            if (sensor.equals(AbstractGroup.MEMBER_ADDED)) {
+                if (source.equals(containerGroup)) {
+                    onContainerAdded((Entity) value);
+                } else if (source.equals(itemGroup)) {
+                    onItemAdded((Entity)value);
+                } else {
+                    throw new IllegalStateException("unexpected event 
source="+source);
+                }
+            } else if (sensor.equals(AbstractGroup.MEMBER_REMOVED)) {
+                if (source.equals(containerGroup)) {
+                    onContainerRemoved((Entity) value);
+                } else if (source.equals(itemGroup)) {
+                    onItemRemoved((Entity) value);
+                } else {
+                    throw new IllegalStateException("unexpected event 
source="+source);
+                }
+            } else if (sensor.equals(Startable.SERVICE_UP)) {
+                // TODO What if start has failed? Is there a sensor to 
indicate that?
+                if ((Boolean)value) {
+                    onContainerUp(source);
+                } else {
+                    onContainerDown(source);
+                }
+            } else if (sensor.equals(Movable.CONTAINER)) {
+                onItemMoved(source, (Entity) value);
+            } else {
+                throw new IllegalStateException("Unhandled event type 
"+sensor+": "+event);
+            }
+        }
+    };
+
+    public FollowTheSunPoolImpl() {
+    }
+
+    @Override
+    public void setContents(Group containerGroup, Group itemGroup) {
+        this.containerGroup = containerGroup;
+        this.itemGroup = itemGroup;
+        subscribe(containerGroup, AbstractGroup.MEMBER_ADDED, eventHandler);
+        subscribe(containerGroup, AbstractGroup.MEMBER_REMOVED, eventHandler);
+        subscribe(itemGroup, AbstractGroup.MEMBER_ADDED, eventHandler);
+        subscribe(itemGroup, AbstractGroup.MEMBER_REMOVED, eventHandler);
+
+        // Process extant containers and items
+        for (Entity existingContainer : containerGroup.getMembers()) {
+            onContainerAdded(existingContainer);
+        }
+        for (Entity existingItem : itemGroup.getMembers()) {
+            onItemAdded((Entity)existingItem);
+        }
+    }
+
+    @Override
+    public Group getContainerGroup() {
+        return containerGroup;
+    }
+
+    @Override
+    public Group getItemGroup() {
+        return itemGroup;
+    }
+
+    @Override
+    public Integer getCurrentSize() {
+        return containerGroup.getCurrentSize();
+    }
+
+    @Override
+    public Integer resize(Integer desiredSize) {
+        if (containerGroup instanceof Resizable) return ((Resizable) 
containerGroup).resize(desiredSize);
+
+        throw new UnsupportedOperationException("Container group is not 
resizable");
+    }
+
+
+    private void onContainerAdded(Entity newContainer) {
+        subscribe(newContainer, Startable.SERVICE_UP, eventHandler);
+        if (!(newContainer instanceof Startable) || 
Boolean.TRUE.equals(newContainer.getAttribute(Startable.SERVICE_UP))) {
+            onContainerUp(newContainer);
+        }
+    }
+
+    private void onContainerUp(Entity newContainer) {
+        if (containers.add(newContainer)) {
+            emit(CONTAINER_ADDED, newContainer);
+        }
+    }
+
+    private void onContainerDown(Entity oldContainer) {
+        if (containers.remove(oldContainer)) {
+            emit(CONTAINER_REMOVED, oldContainer);
+        }
+    }
+
+    private void onContainerRemoved(Entity oldContainer) {
+        unsubscribe(oldContainer);
+        onContainerDown(oldContainer);
+    }
+
+    private void onItemAdded(Entity item) {
+        if (items.add(item)) {
+            subscribe(item, Movable.CONTAINER, eventHandler);
+            emit(ITEM_ADDED, item);
+        }
+    }
+
+    private void onItemRemoved(Entity item) {
+        if (items.remove(item)) {
+            unsubscribe(item);
+            emit(ITEM_REMOVED, item);
+        }
+    }
+
+    private void onItemMoved(Entity item, Entity container) {
+        emit(ITEM_MOVED, new ContainerItemPair(container, item));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunStrategy.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunStrategy.java
 
b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunStrategy.java
new file mode 100644
index 0000000..68d6ae2
--- /dev/null
+++ 
b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunStrategy.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.followthesun;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.location.Location;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.brooklyn.policy.loadbalancing.Movable;
+
+import com.google.common.collect.Iterables;
+
+// TODO: extract interface
+public class FollowTheSunStrategy<ContainerType extends Entity, ItemType 
extends Movable> {
+    
+    // This is a modified version of the InterGeographyLatencyPolicy (aka 
Follow-The-Sun) policy from Monterey v3.
+    
+    // TODO location constraints
+    
+    private static final Logger LOG = 
LoggerFactory.getLogger(FollowTheSunStrategy.class);
+    
+    private final FollowTheSunParameters parameters;
+    private final FollowTheSunModel<ContainerType,ItemType> model;
+    private final String name;
+    
+    public FollowTheSunStrategy(FollowTheSunModel<ContainerType,ItemType> 
model, FollowTheSunParameters parameters) {
+        this.model = model;
+        this.parameters = parameters;
+        this.name = model.getName();
+    }
+    
+    public void rebalance() {
+        try {
+            Set<ItemType> items = model.getItems();
+            Map<ItemType, Map<Location, Double>> directSendsToItemByLocation = 
model.getDirectSendsToItemByLocation();
+            
+            for (ItemType item : items) {
+                String itemName = model.getName(item);
+                Location activeLocation = model.getItemLocation(item);
+                ContainerType activeContainer = model.getItemContainer(item);
+                Map<Location, Double> sendsByLocation = 
directSendsToItemByLocation.get(item);
+                if (sendsByLocation == null) sendsByLocation = 
Collections.emptyMap();
+                
+                if (parameters.excludedLocations.contains(activeLocation)) {
+                    if (LOG.isTraceEnabled()) LOG.trace("Ignoring segment {} 
as it is in {}", itemName, activeLocation);
+                    continue;
+                }
+                if (!model.isItemMoveable(item)) {
+                    if (LOG.isDebugEnabled()) LOG.debug("POLICY {} skipping 
any migration of {}, it is not moveable", name, itemName);
+                    continue;
+                }
+                if (model.hasActiveMigration(item)) {
+                    LOG.info("POLICY {} skipping any migration of {}, it is 
involved in an active migration already", name, itemName);
+                    continue;
+                }
+                
+                double total = 
DefaultFollowTheSunModel.sum(sendsByLocation.values());
+
+                if (LOG.isTraceEnabled()) LOG.trace("POLICY {} detected {} 
msgs/sec in {}, split up as: {}", new Object[] {name, total, itemName, 
sendsByLocation});
+                
+                Double current = sendsByLocation.get(activeLocation);
+                if (current == null) current=0d;
+                List<WeightedObject<Location>> locationsWtd = new 
ArrayList<WeightedObject<Location>>();
+                if (total > 0) {
+                    for (Map.Entry<Location, Double> entry : 
sendsByLocation.entrySet()) {
+                        Location l = entry.getKey();
+                        Double d = entry.getValue();
+                        if (d > current) locationsWtd.add(new 
WeightedObject<Location>(l, d));
+                    }
+                }
+                Collections.sort(locationsWtd);
+                Collections.reverse(locationsWtd);
+                
+                double highestMsgRate = -1;
+                Location highestLocation = null;
+                ContainerType optimalContainerInHighest = null;
+                while (!locationsWtd.isEmpty()) {
+                    WeightedObject<Location> weightedObject = 
locationsWtd.remove(0);
+                    highestMsgRate = weightedObject.getWeight();
+                    highestLocation = weightedObject.getObject();
+                    optimalContainerInHighest = 
findOptimal(model.getAvailableContainersFor(item, highestLocation));
+                    if (optimalContainerInHighest != null) {
+                        break;
+                    }
+                }
+                if (optimalContainerInHighest == null) {
+                    if (LOG.isDebugEnabled()) LOG.debug("POLICY {} detected {} 
is already in optimal permitted location ({} of {} msgs/sec)", new Object[] 
{name, itemName, highestMsgRate, total});
+                    continue;                   
+                }
+                
+                double nextHighestMsgRate = -1;
+                ContainerType optimalContainerInNextHighest = null;
+                while (!locationsWtd.isEmpty()) {
+                    WeightedObject<Location> weightedObject = 
locationsWtd.remove(0);
+                    nextHighestMsgRate = weightedObject.getWeight();
+                    Location nextHighestLocation = weightedObject.getObject();
+                    optimalContainerInNextHighest = 
findOptimal(model.getAvailableContainersFor(item, nextHighestLocation));
+                    if (optimalContainerInNextHighest != null) {
+                        break;
+                    }
+                }
+                if (optimalContainerInNextHighest == null) {
+                    nextHighestMsgRate = current;
+                }
+                
+                if (parameters.isTriggered(highestMsgRate, total, 
nextHighestMsgRate, current)) {
+                    LOG.info("POLICY "+name+" detected "+itemName+" should be 
in location "+highestLocation+" on "+optimalContainerInHighest+" 
("+highestMsgRate+" of "+total+" msgs/sec), migrating");
+                    try {
+                        if (activeContainer.equals(optimalContainerInHighest)) 
{
+                            //shouldn't happen
+                            LOG.warn("POLICY "+name+" detected "+itemName+" 
should move to "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" 
msgs/sec) but it is already there with "+current+" msgs/sec");
+                        } else {
+                            item.move(optimalContainerInHighest);
+                            model.onItemMoved(item, optimalContainerInHighest);
+                        }
+                    } catch (Exception e) {
+                        LOG.warn("POLICY "+name+" detected "+itemName+" should 
be on "+optimalContainerInHighest+", but can't move it: "+e, e);
+                    }
+                } else {
+                    if (LOG.isTraceEnabled()) LOG.trace("POLICY "+name+" 
detected "+itemName+" need not move to "+optimalContainerInHighest+" 
("+highestMsgRate+" of "+total+" msgs/sec not much better than "+current+" at 
"+activeContainer+")");
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Error in policy "+name+" (ignoring): "+e, e);
+        }
+    }
+
+    private ContainerType findOptimal(Collection<ContainerType> contenders) {
+        /*
+         * TODO should choose the least loaded mediator. Currently chooses 
first available, and relies 
+         * on a load-balancer to move it again; would be good if these could 
share decision code so move 
+         * it to the right place immediately. e.g.
+         *   policyUtil.findLeastLoadedMediator(nodesInLocation);
+         */
+        return (contenders.isEmpty() ? null : Iterables.get(contenders, 0));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/WeightedObject.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/WeightedObject.java
 
b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/WeightedObject.java
new file mode 100644
index 0000000..e1caf1f
--- /dev/null
+++ 
b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/WeightedObject.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.followthesun;
+
+public class WeightedObject<T> implements Comparable<WeightedObject<T>>{
+    
+    final T object;
+    final double weight;
+    
+    public WeightedObject(T obj, double weight) {
+        this.object = obj;
+        this.weight = weight;
+    }
+    
+    public T getObject() {
+        return object;
+    }
+    
+    public double getWeight() {
+        return weight;
+    }
+
+    /**
+     * Note that equals and compareTo are not consistent: x.compareTo(y)==0 
iff x.equals(y) is 
+     * highly recommended in Java, but is not required. This can make TreeSet 
etc behave poorly...
+     */
+    public int compareTo(WeightedObject<T> o) {
+        double diff = o.getWeight() - weight;
+        if (diff>0.0000000000000001) return -1;
+        if (diff<-0.0000000000000001) return 1;
+        return 0;
+    }
+
+    @Override
+    /** true irrespective of weight */
+    public boolean equals(Object obj) {
+        if (!(obj instanceof WeightedObject<?>)) return false;
+        if (getObject()==null) {
+            return ((WeightedObject<?>)obj).getObject() == null;
+        } else {
+            return getObject().equals( ((WeightedObject<?>)obj).getObject() );
+        }
+    }
+    
+    @Override
+    public int hashCode() {
+        if (getObject()==null) return 234519078;
+        return getObject().hashCode();
+    }
+    
+    @Override
+    public String toString() {
+        return ""+getObject()+"["+getWeight()+"]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
 
b/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
new file mode 100644
index 0000000..d7af68a
--- /dev/null
+++ 
b/policy/src/main/java/org/apache/brooklyn/policy/ha/AbstractFailureDetector.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import static brooklyn.util.time.Time.makeTimeStringRounded;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+import org.apache.brooklyn.core.util.task.BasicTask;
+import org.apache.brooklyn.core.util.task.ScheduledTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.EntityInternal;
+
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.reflect.TypeToken;
+
+public abstract class AbstractFailureDetector extends AbstractPolicy {
+
+    // TODO Remove duplication from ServiceFailureDetector, particularly for 
the stabilisation delays.
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractFailureDetector.class);
+
+    private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100;
+
+    public static final ConfigKey<Duration> POLL_PERIOD = 
ConfigKeys.newDurationConfigKey(
+            "failureDetector.pollPeriod", "", Duration.ONE_SECOND);
+
+    @SetFromFlag("failedStabilizationDelay")
+    public static final ConfigKey<Duration> FAILED_STABILIZATION_DELAY = 
ConfigKeys.newDurationConfigKey(
+            "failureDetector.serviceFailedStabilizationDelay",
+            "Time period for which the health check consistently fails "
+                    + "(e.g. doesn't report failed-ok-faled) before concluding 
failure.",
+            Duration.ZERO);
+
+    @SetFromFlag("recoveredStabilizationDelay")
+    public static final ConfigKey<Duration> RECOVERED_STABILIZATION_DELAY = 
ConfigKeys.newDurationConfigKey(
+            "failureDetector.serviceRecoveredStabilizationDelay",
+            "Time period for which the health check succeeds continiually " +
+                    "(e.g. doesn't report ok-failed-ok) before concluding 
recovered",
+            Duration.ZERO);
+
+    @SuppressWarnings("serial")
+    public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_FAILED = 
ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {},
+            "failureDetector.sensor.fail", "A sensor which will indicate 
failure when set", HASensors.ENTITY_FAILED);
+
+    @SuppressWarnings("serial")
+    public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_RECOVERED 
= ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {},
+            "failureDetector.sensor.recover", "A sensor which will indicate 
recovery from failure when set", HASensors.ENTITY_RECOVERED);
+
+    public interface CalculatedStatus {
+        boolean isHealthy();
+        String getDescription();
+    }
+
+    private final class PublishJob implements Runnable {
+        @Override public void run() {
+            try {
+                executorTime = System.currentTimeMillis();
+                executorQueued.set(false);
+
+                publishNow();
+
+            } catch (Exception e) {
+                if (isRunning()) {
+                    LOG.error("Problem resizing: "+e, e);
+                } else {
+                    if (LOG.isDebugEnabled()) LOG.debug("Problem resizing, but 
no longer running: "+e, e);
+                }
+            } catch (Throwable t) {
+                LOG.error("Problem in service-failure-detector: "+t, t);
+                throw Exceptions.propagate(t);
+            }
+        }
+    }
+
+    private final class HealthPoller implements Runnable {
+        @Override
+        public void run() {
+            checkHealth();
+        }
+    }
+
+    private final class HealthPollingTaskFactory implements Callable<Task<?>> {
+        @Override
+        public Task<?> call() {
+            BasicTask<Void> task = new BasicTask<Void>(new HealthPoller());
+            BrooklynTaskTags.setTransient(task);
+            return task;
+        }
+    }
+
+    protected static class BasicCalculatedStatus implements CalculatedStatus {
+        private boolean healthy;
+        private String description;
+
+        public BasicCalculatedStatus(boolean healthy, String description) {
+            this.healthy = healthy;
+            this.description = description;
+        }
+
+        @Override
+        public boolean isHealthy() {
+            return healthy;
+        }
+
+        @Override
+        public String getDescription() {
+            return description;
+        }
+    }
+
+    public enum LastPublished {
+        NONE,
+        FAILED,
+        RECOVERED;
+    }
+
+    protected final AtomicReference<Long> stateLastGood = new 
AtomicReference<Long>();
+    protected final AtomicReference<Long> stateLastFail = new 
AtomicReference<Long>();
+
+    protected Long currentFailureStartTime = null;
+    protected Long currentRecoveryStartTime = null;
+
+    protected LastPublished lastPublished = LastPublished.NONE;
+
+    private final AtomicBoolean executorQueued = new AtomicBoolean(false);
+    private volatile long executorTime = 0;
+
+    private Callable<Task<?>> pollingTaskFactory = new 
HealthPollingTaskFactory();
+
+    private Task<?> scheduledTask;
+
+    protected abstract CalculatedStatus calculateStatus();
+
+    @Override
+    public void setEntity(EntityLocal entity) {
+        super.setEntity(entity);
+
+        if (isRunning()) {
+            doStartPolling();
+        }
+    }
+
+    @Override
+    public void suspend() {
+        scheduledTask.cancel(true);
+        super.suspend();
+    }
+
+    @Override
+    public void resume() {
+        currentFailureStartTime = null;
+        currentRecoveryStartTime = null;
+        lastPublished = LastPublished.NONE;
+        executorQueued.set(false);
+        executorTime = 0;
+
+        super.resume();
+        doStartPolling();
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void doStartPolling() {
+        if (scheduledTask == null || scheduledTask.isDone()) {
+            ScheduledTask task = new ScheduledTask(MutableMap.of("period", 
getPollPeriod(), "displayName", getTaskName()), pollingTaskFactory);
+            scheduledTask = 
((EntityInternal)entity).getExecutionContext().submit(task);
+        }
+    }
+
+    private String getTaskName() {
+        return getDisplayName();
+    }
+
+    protected Duration getPollPeriod() {
+        return getConfig(POLL_PERIOD);
+    }
+
+    protected Duration getFailedStabilizationDelay() {
+        return getConfig(FAILED_STABILIZATION_DELAY);
+    }
+
+    protected Duration getRecoveredStabilizationDelay() {
+        return getConfig(RECOVERED_STABILIZATION_DELAY);
+    }
+
+    protected Sensor<FailureDescriptor> getSensorFailed() {
+        return getConfig(SENSOR_FAILED);
+    }
+
+    protected Sensor<FailureDescriptor> getSensorRecovered() {
+        return getConfig(SENSOR_RECOVERED);
+    }
+
+    private synchronized void checkHealth() {
+        CalculatedStatus status = calculateStatus();
+        boolean healthy = status.isHealthy();
+        long now = System.currentTimeMillis();
+
+        if (healthy) {
+            stateLastGood.set(now);
+            if (lastPublished == LastPublished.FAILED) {
+                if (currentRecoveryStartTime == null) {
+                    LOG.info("{} check for {}, now recovering: {}", new 
Object[] {this, entity, getDescription(status)});
+                    currentRecoveryStartTime = now;
+                    schedulePublish();
+                } else {
+                    if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, 
continuing recovering: {}", new Object[] {this, entity, 
getDescription(status)});
+                }
+            } else {
+                if (currentFailureStartTime != null) {
+                    LOG.info("{} check for {}, now healthy: {}", new Object[] 
{this, entity, getDescription(status)});
+                    currentFailureStartTime = null;
+                } else {
+                    if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, 
still healthy: {}", new Object[] {this, entity, getDescription(status)});
+                }
+            }
+        } else {
+            stateLastFail.set(now);
+            if (lastPublished != LastPublished.FAILED) {
+                if (currentFailureStartTime == null) {
+                    LOG.info("{} check for {}, now failing: {}", new Object[] 
{this, entity, getDescription(status)});
+                    currentFailureStartTime = now;
+                    schedulePublish();
+                } else {
+                    if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, 
continuing failing: {}", new Object[] {this, entity, getDescription(status)});
+                }
+            } else {
+                if (currentRecoveryStartTime != null) {
+                    LOG.info("{} check for {}, now failing: {}", new Object[] 
{this, entity, getDescription(status)});
+                    currentRecoveryStartTime = null;
+                } else {
+                    if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, 
still failed: {}", new Object[] {this, entity, getDescription(status)});
+                }
+            }
+        }
+    }
+
+    protected void schedulePublish() {
+        schedulePublish(0);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void schedulePublish(long delay) {
+        if (isRunning() && executorQueued.compareAndSet(false, true)) {
+            long now = System.currentTimeMillis();
+            delay = Math.max(0, Math.max(delay, (executorTime + 
MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now));
+            if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in 
{}ms", this, delay);
+
+            Runnable job = new PublishJob();
+
+            ScheduledTask task = new ScheduledTask(MutableMap.of("delay", 
Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask<Void>(job));
+            ((EntityInternal)entity).getExecutionContext().submit(task);
+        }
+    }
+
+    private synchronized void publishNow() {
+        if (!isRunning()) return;
+
+        CalculatedStatus calculatedStatus = calculateStatus();
+        boolean healthy = calculatedStatus.isHealthy();
+
+        Long lastUpTime = stateLastGood.get();
+        Long lastDownTime = stateLastFail.get();
+        long serviceFailedStabilizationDelay = 
getFailedStabilizationDelay().toMilliseconds();
+        long serviceRecoveredStabilizationDelay = 
getRecoveredStabilizationDelay().toMilliseconds();
+        long now = System.currentTimeMillis();
+
+        if (healthy) {
+            if (lastPublished == LastPublished.FAILED) {
+                // only publish if consistently up for 
serviceRecoveredStabilizationDelay
+                long currentRecoveryPeriod = getTimeDiff(now, 
currentRecoveryStartTime);
+                long sinceLastDownPeriod = getTimeDiff(now, lastDownTime);
+                if (currentRecoveryPeriod > serviceRecoveredStabilizationDelay 
&& sinceLastDownPeriod > serviceRecoveredStabilizationDelay) {
+                    String description = getDescription(calculatedStatus);
+                    LOG.warn("{} check for {}, publishing recovered: {}", new 
Object[] {this, entity, description});
+                    entity.emit(getSensorRecovered(), new 
HASensors.FailureDescriptor(entity, description));
+                    lastPublished = LastPublished.RECOVERED;
+                    currentFailureStartTime = null;
+                } else {
+                    long nextAttemptTime = 
Math.max(serviceRecoveredStabilizationDelay - currentRecoveryPeriod, 
serviceRecoveredStabilizationDelay - sinceLastDownPeriod);
+                    schedulePublish(nextAttemptTime);
+                }
+            }
+        } else {
+            if (lastPublished != LastPublished.FAILED) {
+                // only publish if consistently down for 
serviceFailedStabilizationDelay
+                long currentFailurePeriod = getTimeDiff(now, 
currentFailureStartTime);
+                long sinceLastUpPeriod = getTimeDiff(now, lastUpTime);
+                if (currentFailurePeriod > serviceFailedStabilizationDelay && 
sinceLastUpPeriod > serviceFailedStabilizationDelay) {
+                    String description = getDescription(calculatedStatus);
+                    LOG.warn("{} connectivity-check for {}, publishing failed: 
{}", new Object[] {this, entity, description});
+                    entity.emit(getSensorFailed(), new 
HASensors.FailureDescriptor(entity, description));
+                    lastPublished = LastPublished.FAILED;
+                    currentRecoveryStartTime = null;
+                } else {
+                    long nextAttemptTime = 
Math.max(serviceFailedStabilizationDelay - currentFailurePeriod, 
serviceFailedStabilizationDelay - sinceLastUpPeriod);
+                    schedulePublish(nextAttemptTime);
+                }
+            }
+        }
+    }
+
+    protected String getDescription(CalculatedStatus status) {
+        Long lastUpTime = stateLastGood.get();
+        Long lastDownTime = stateLastGood.get();
+        Duration serviceFailedStabilizationDelay = 
getFailedStabilizationDelay();
+        Duration serviceRecoveredStabilizationDelay = 
getRecoveredStabilizationDelay();
+
+        return String.format("%s; healthy=%s; timeNow=%s; lastUp=%s; 
lastDown=%s; lastPublished=%s; "+
+                    "currentFailurePeriod=%s; currentRecoveryPeriod=%s",
+                status.getDescription(),
+                status.isHealthy(),
+                Time.makeDateString(System.currentTimeMillis()),
+                (lastUpTime != null ? Time.makeDateString(lastUpTime) : 
"<never>"),
+                (lastDownTime != null ? Time.makeDateString(lastDownTime) : 
"<never>"),
+                lastPublished,
+                (currentFailureStartTime != null ? 
getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization 
"+makeTimeStringRounded(serviceFailedStabilizationDelay) + ")",
+                (currentRecoveryStartTime != null ? 
getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization 
"+makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")");
+    }
+
+    private long getTimeDiff(Long recent, Long previous) {
+        return (previous == null) ? recent : (recent - previous);
+    }
+
+    private String getTimeStringSince(Long time) {
+        return time == null ? null : 
Time.makeTimeStringRounded(System.currentTimeMillis() - time);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/ha/ConditionalSuspendPolicy.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/org/apache/brooklyn/policy/ha/ConditionalSuspendPolicy.java
 
b/policy/src/main/java/org/apache/brooklyn/policy/ha/ConditionalSuspendPolicy.java
new file mode 100644
index 0000000..4bc1dc1
--- /dev/null
+++ 
b/policy/src/main/java/org/apache/brooklyn/policy/ha/ConditionalSuspendPolicy.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.policy.Policy;
+import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.util.javalang.JavaClassNames;
+
+import com.google.common.base.Preconditions;
+
+public class ConditionalSuspendPolicy extends AbstractPolicy {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConditionalSuspendPolicy.class);
+
+    @SetFromFlag("suppressSensor")
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static final ConfigKey<Sensor<?>> SUSPEND_SENSOR = (ConfigKey) 
ConfigKeys.newConfigKey(Sensor.class,
+            "suppressSensor", "Sensor which will suppress the target policy", 
HASensors.CONNECTION_FAILED); 
+
+    @SetFromFlag("resetSensor")
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static final ConfigKey<Sensor<?>> RESUME_SENSOR = (ConfigKey) 
ConfigKeys.newConfigKey(Sensor.class,
+            "resetSensor", "Resume target policy when this sensor is 
observed", HASensors.CONNECTION_RECOVERED);
+
+    @SetFromFlag("target")
+    public static final ConfigKey<Object> SUSPEND_TARGET = 
ConfigKeys.newConfigKey(Object.class,
+            "target", "The target policy to suspend. Either direct reference 
or the value of the suspendTarget config on a policy from the same entity.");
+
+    @Override
+    public void setEntity(EntityLocal entity) {
+        super.setEntity(entity);
+        Object target = config().get(SUSPEND_TARGET);
+        Preconditions.checkNotNull(target, "Suspend target required");
+        Preconditions.checkNotNull(getTargetPolicy(), "Can't find target 
policy set in " + SUSPEND_TARGET.getName() + ": " + target);
+        subscribe();
+        uniqueTag = 
JavaClassNames.simpleClassName(getClass())+":"+getConfig(SUSPEND_SENSOR).getName()+":"+getConfig(RESUME_SENSOR).getName();
+    }
+
+    private void subscribe() {
+        subscribe(entity, getConfig(SUSPEND_SENSOR), new 
SensorEventListener<Object>() {
+            @Override public void onEvent(final SensorEvent<Object> event) {
+                if (isRunning()) {
+                    Policy target = getTargetPolicy();
+                    target.suspend();
+                    LOG.debug("Suspended policy " + target + ", triggered by " 
+ event.getSensor() + " = " + event.getValue());
+                }
+            }
+
+        });
+        subscribe(entity, getConfig(RESUME_SENSOR), new 
SensorEventListener<Object>() {
+            @Override public void onEvent(final SensorEvent<Object> event) {
+                if (isRunning()) {
+                    Policy target = getTargetPolicy();
+                    target.resume();
+                    LOG.debug("Resumed policy " + target + ", triggered by " + 
event.getSensor() + " = " + event.getValue());
+                }
+            }
+        });
+    }
+
+    private Policy getTargetPolicy() {
+        Object target = config().get(SUSPEND_TARGET);
+        if (target instanceof Policy) {
+            return (Policy)target;
+        } else if (target instanceof String) {
+            for (Policy policy : entity.getPolicies()) {
+                // No way to set config values for keys NOT declared in the 
policy,
+                // so must use displayName as a generally available config 
value.
+                if (target.equals(policy.getDisplayName()) || 
target.equals(policy.getClass().getName())) {
+                    return policy;
+                }
+            }
+        } else {
+            throw new IllegalStateException("Unexpected type " + 
target.getClass() + " for target " + target);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetector.java
----------------------------------------------------------------------
diff --git 
a/policy/src/main/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetector.java
 
b/policy/src/main/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetector.java
new file mode 100644
index 0000000..2cca77f
--- /dev/null
+++ 
b/policy/src/main/java/org/apache/brooklyn/policy/ha/ConnectionFailureDetector.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.BasicNotificationSensor;
+
+import org.apache.brooklyn.policy.ha.HASensors.FailureDescriptor;
+
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.net.Networking;
+import brooklyn.util.time.Duration;
+
+import com.google.common.net.HostAndPort;
+
+/**
+ * Monitors a given {@link HostAndPort}, to emit HASensors.CONNECTION_FAILED 
and HASensors.CONNECTION_RECOVERED 
+ * if the connection is lost/restored.
+ */
+@Catalog(name="Connection Failure Detector", description="HA policy for 
monitoring a host:port, "
+        + "emitting an event if the connection is lost/restored")
+public class ConnectionFailureDetector extends AbstractFailureDetector {
+
+    public static final ConfigKey<HostAndPort> ENDPOINT = 
ConfigKeys.newConfigKey(HostAndPort.class, 
"connectionFailureDetector.endpoint");
+
+    public static final ConfigKey<Duration> POLL_PERIOD = 
ConfigKeys.newConfigKey(Duration.class, "connectionFailureDetector.pollPeriod", 
"", Duration.ONE_SECOND);
+
+    public static final BasicNotificationSensor<FailureDescriptor> 
CONNECTION_FAILED = HASensors.CONNECTION_FAILED;
+
+    public static final BasicNotificationSensor<FailureDescriptor> 
CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED;
+
+    @SetFromFlag("connectionFailedStabilizationDelay")
+    public static final ConfigKey<Duration> 
CONNECTION_FAILED_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
+            .name("connectionFailureDetector.serviceFailedStabilizationDelay")
+            .description("Time period for which the connection must be 
consistently down for "
+                    + "(e.g. doesn't report down-up-down) before concluding 
failure. "
+                    + "Note that long TCP timeouts mean there can be long 
(e.g. 70 second) "
+                    + "delays in noticing a connection refused condition.")
+            .defaultValue(Duration.ZERO)
+            .build();
+
+    @SetFromFlag("connectionRecoveredStabilizationDelay")
+    public static final ConfigKey<Duration> 
CONNECTION_RECOVERED_STABILIZATION_DELAY = 
BasicConfigKey.builder(Duration.class)
+            
.name("connectionFailureDetector.serviceRecoveredStabilizationDelay")
+            .description("For a failed connection, time period for which the 
connection must be consistently up for (e.g. doesn't report up-down-up) before 
concluding recovered")
+            .defaultValue(Duration.ZERO)
+            .build();
+
+    @Override
+    public void init() {
+        super.init();
+        getRequiredConfig(ENDPOINT); // just to confirm it's set, failing fast
+        if (config().getRaw(SENSOR_FAILED).isAbsent()) {
+            config().set(SENSOR_FAILED, CONNECTION_FAILED);
+        }
+        if (config().getRaw(SENSOR_RECOVERED).isAbsent()) {
+            config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED);
+        }
+    }
+
+    @Override
+    protected CalculatedStatus calculateStatus() {
+        HostAndPort endpoint = getConfig(ENDPOINT);
+        boolean isHealthy = Networking.isReachable(endpoint);
+        return new BasicCalculatedStatus(isHealthy, "endpoint=" + endpoint);
+    }
+
+    //Persistence compatibility overrides
+    @Override
+    protected Duration getPollPeriod() {
+        return getConfig(POLL_PERIOD);
+    }
+
+    @Override
+    protected Duration getFailedStabilizationDelay() {
+        return getConfig(CONNECTION_FAILED_STABILIZATION_DELAY);
+    }
+
+    @Override
+    protected Duration getRecoveredStabilizationDelay() {
+        return getConfig(CONNECTION_RECOVERED_STABILIZATION_DELAY);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Sensor<FailureDescriptor> getSensorFailed() {
+        Maybe<Object> sensorFailed = config().getRaw(SENSOR_FAILED);
+        if (sensorFailed.isPresent()) {
+            return (Sensor<FailureDescriptor>)sensorFailed.get();
+        } else {
+            return CONNECTION_FAILED;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Sensor<FailureDescriptor> getSensorRecovered() {
+        Maybe<Object> sensorRecovered = config().getRaw(SENSOR_RECOVERED);
+        if (sensorRecovered.isPresent()) {
+            return (Sensor<FailureDescriptor>)sensorRecovered.get();
+        } else {
+            return CONNECTION_RECOVERED;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/ha/HASensors.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/HASensors.java 
b/policy/src/main/java/org/apache/brooklyn/policy/ha/HASensors.java
new file mode 100644
index 0000000..24763f0
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/HASensors.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.ha;
+
+import brooklyn.event.basic.BasicNotificationSensor;
+
+import com.google.common.base.Objects;
+
+public class HASensors {
+
+    public static final BasicNotificationSensor<FailureDescriptor> 
ENTITY_FAILED = new BasicNotificationSensor<FailureDescriptor>(
+            FailureDescriptor.class, "ha.entityFailed", "Indicates that an 
entity has failed");
+    
+    public static final BasicNotificationSensor<FailureDescriptor> 
ENTITY_RECOVERED = new BasicNotificationSensor<FailureDescriptor>(
+            FailureDescriptor.class, "ha.entityRecovered", "Indicates that a 
previously failed entity has recovered");
+    
+    public static final BasicNotificationSensor<FailureDescriptor> 
CONNECTION_FAILED = new BasicNotificationSensor<FailureDescriptor>(
+            FailureDescriptor.class, "ha.connectionFailed", "Indicates that a 
connection has failed");
+    
+    public static final BasicNotificationSensor<FailureDescriptor> 
CONNECTION_RECOVERED = new BasicNotificationSensor<FailureDescriptor>(
+            FailureDescriptor.class, "ha.connectionRecovered", "Indicates that 
a previously failed connection has recovered");
+    
+    // TODO How to make this serializable with the entity reference
+    public static class FailureDescriptor {
+        private final Object component;
+        private final String description;
+        
+        public FailureDescriptor(Object component, String description) {
+            this.component = component;
+            this.description = description;
+        }
+        
+        public Object getComponent() {
+            return component;
+        }
+        
+        public String getDescription() {
+            return description;
+        }
+        
+        @Override
+        public String toString() {
+            return Objects.toStringHelper(this).add("component", 
component).add("description", description).toString();
+        }
+    }
+}

Reply via email to