BROOKLYN-322: feeds donât poll if entity unmanaged
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/b54814d3 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/b54814d3 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/b54814d3 Branch: refs/heads/master Commit: b54814d39bc3e9cd4a97c77bdfbcd2fd7b59516b Parents: 9d32882 Author: Aled Sage <aled.s...@gmail.com> Authored: Wed Aug 3 14:26:38 2016 +0100 Committer: Aled Sage <aled.s...@gmail.com> Committed: Wed Aug 3 18:01:23 2016 +0100 ---------------------------------------------------------------------- .../org/apache/brooklyn/core/feed/Poller.java | 3 + .../SoftwareProcessEntityFeedRebindTest.java | 175 +++++++++++++++++++ 2 files changed, 178 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/b54814d3/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java index eed8f63..9d8e0f8 100644 --- a/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java +++ b/core/src/main/java/org/apache/brooklyn/core/feed/Poller.java @@ -147,6 +147,9 @@ public class Poller<V> { public Task<?> call() { DynamicSequentialTask<Void> task = new DynamicSequentialTask<Void>(MutableMap.of("displayName", scheduleName, "entity", entity), new Callable<Void>() { public Void call() { + if (!Entities.isManaged(entity)) { + return null; + } if (onlyIfServiceUp && !Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) { return null; } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/b54814d3/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityFeedRebindTest.java ---------------------------------------------------------------------- diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityFeedRebindTest.java b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityFeedRebindTest.java new file mode 100644 index 0000000..506ff39 --- /dev/null +++ b/software/base/src/test/java/org/apache/brooklyn/entity/software/base/SoftwareProcessEntityFeedRebindTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.software.base; + +import static org.testng.Assert.assertFalse; + +import java.util.List; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntityLocal; +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.location.LocationSpec; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.core.entity.BrooklynConfigKeys; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.EntityAsserts; +import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.core.test.entity.TestApplication; +import org.apache.brooklyn.entity.software.base.SoftwareProcessEntityRebindTest.MyProvisioningLocation; +import org.apache.brooklyn.entity.software.base.SoftwareProcessEntityTest.MyService; +import org.apache.brooklyn.entity.software.base.SoftwareProcessEntityTest.MyServiceImpl; +import org.apache.brooklyn.entity.software.base.SoftwareProcessEntityTest.SimulatedDriver; +import org.apache.brooklyn.feed.function.FunctionFeed; +import org.apache.brooklyn.feed.function.FunctionPollConfig; +import org.apache.brooklyn.location.ssh.SshMachineLocation; +import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import com.google.common.base.Functions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class SoftwareProcessEntityFeedRebindTest extends RebindTestFixtureWithApp { + + private static final Logger LOG = LoggerFactory.getLogger(SoftwareProcessEntityFeedRebindTest.class); + + @Override + protected boolean enablePersistenceBackups() { + return false; + } + + @Test + public void testFeedsDoNotPollUntilManaged() throws Exception { + runFeedsDoNotPollUntilManaged(1); + } + + @Test(groups="Integeration") + public void testFeedsDoNotPollUntilManagedManyEntities() throws Exception { + runFeedsDoNotPollUntilManaged(100); + } + + protected void runFeedsDoNotPollUntilManaged(int numEntities) throws Exception { + List<MyService> origEs = Lists.newArrayList(); + + LOG.info("Creating "+numEntities+" entities"); + for (int i = 0; i < numEntities; i++) { + origEs.add(origApp.createAndManageChild(EntitySpec.create(MyService.class) + .impl(MyServiceWithFeedsImpl.class) + .configure(BrooklynConfigKeys.SKIP_ON_BOX_BASE_DIR_RESOLUTION, true))); + } + + LOG.info("Starting "+numEntities+" entities"); + MyProvisioningLocation origLoc = mgmt().getLocationManager().createLocation(LocationSpec.create(MyProvisioningLocation.class) + .displayName("mylocname")); + origApp.start(ImmutableList.of(origLoc)); + + LOG.info("Rebinding "+numEntities+" entities"); + newApp = (TestApplication) rebind(); + List<Entity> newEs = ImmutableList.copyOf(newApp.getChildren()); + + LOG.info("Checking state of "+numEntities+" entities"); + for (Entity newE : newEs) { + EntityAsserts.assertAttributeChangesEventually(newE, MyServiceWithFeeds.COUNTER); + assertFalse(((MyServiceWithFeeds)newE).isFeedCalledWhenNotManaged()); + SimulatedDriverWithFeeds driver = (SimulatedDriverWithFeeds) ((MyServiceWithFeeds)newE).getDriver(); + assertFalse(driver.isRunningCalledWhenNotManaged); + } + } + + public static interface MyServiceWithFeeds extends MyService { + AttributeSensor<Integer> COUNTER = Sensors.newIntegerSensor("counter"); + + boolean isFeedCalledWhenNotManaged(); + } + + public static class MyServiceWithFeedsImpl extends MyServiceImpl implements MyServiceWithFeeds { + protected FunctionFeed functionFeed; + protected boolean feedCalledWhenNotManaged; + + @Override + public boolean isFeedCalledWhenNotManaged() { + return feedCalledWhenNotManaged; + } + + @Override + public void init() { + super.init(); + + // By calling feeds().add(...), it will persist the feed, and rebind it + functionFeed = feeds().add(FunctionFeed.builder() + .entity(this) + .period(Duration.millis(10)) + .uniqueTag("MyserviceWithFeeds-functionFeed") + .poll(new FunctionPollConfig<Integer, Integer>(COUNTER) + .suppressDuplicates(true) + .onException(Functions.constant(-1)) + .callable(new Callable<Integer>() { + public Integer call() { + if (!Entities.isManaged(MyServiceWithFeedsImpl.this)) { + feedCalledWhenNotManaged = true; + throw new IllegalStateException("Entity "+MyServiceWithFeedsImpl.this+" is not managed in feed.call"); + } + Integer oldVal = sensors().get(COUNTER); + return (oldVal == null ? 0 : oldVal) + 1; + } + })) + .build()); + } + + @Override + protected void connectSensors() { + // connectSensors is called on rebind; it will re-register the feed + super.connectSensors(); + super.connectServiceUpIsRunning(); + } + + @Override + protected void disconnectSensors() { + super.disconnectSensors(); + super.disconnectServiceUpIsRunning(); + } + + @Override + public Class<?> getDriverInterface() { + return SimulatedDriverWithFeeds.class; + } + } + + public static class SimulatedDriverWithFeeds extends SimulatedDriver { + protected boolean isRunningCalledWhenNotManaged = false; + + public SimulatedDriverWithFeeds(EntityLocal entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + public boolean isRunning() { + if (!Entities.isManaged(entity)) { + isRunningCalledWhenNotManaged = true; + throw new IllegalStateException("Entity "+entity+" is not managed in driver.isRunning"); + } + return super.isRunning(); + } + } +}