Repository: brooklyn-server Updated Branches: refs/heads/master cd40893d1 -> 7e2b497cb
SshCommandSensor: no anonymous inner classes Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/d4901150 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/d4901150 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/d4901150 Branch: refs/heads/master Commit: d490115023b6c53d3a79b72f8ec190f14e5eb949 Parents: 3afc8fe Author: Aled Sage <aled.s...@gmail.com> Authored: Thu Apr 26 20:57:33 2018 +0100 Committer: Aled Sage <aled.s...@gmail.com> Committed: Fri Apr 27 10:09:31 2018 +0100 ---------------------------------------------------------------------- .../SshCommandSensorYamlRebindTest.java | 89 +++++++++++++ .../core/sensor/ssh/SshCommandSensor.java | 124 +++++++++++++++---- .../brooklyn/util/core/flags/TypeCoercions.java | 15 ++- .../core/mgmt/rebind/RebindTestFixture.java | 33 ++++- .../apache/brooklyn/util/guava/Functionals.java | 21 +++- .../brooklyn/util/text/StringFunctions.java | 12 ++ 6 files changed, 263 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4901150/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlRebindTest.java ---------------------------------------------------------------------- diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlRebindTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlRebindTest.java new file mode 100644 index 0000000..0c0730e --- /dev/null +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/SshCommandSensorYamlRebindTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.camp.brooklyn; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData; +import org.apache.brooklyn.core.entity.EntityAsserts; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.entity.StartableApplication; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.entity.software.base.VanillaSoftwareProcess; +import org.apache.brooklyn.feed.ssh.SshFeed; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool; +import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool.ExecCmd; +import org.testng.annotations.Test; + +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; + +@Test +public class SshCommandSensorYamlRebindTest extends AbstractYamlRebindTest { + + @Test + public void testSshCommandSensorWithEffectorInEnv() throws Exception { + RecordingSshTool.setCustomResponse(".*myCommand.*", new RecordingSshTool.CustomResponse(0, "myResponse", null)); + + createStartWaitAndLogApplication( + "location:", + " localhost:", + " sshToolClass: "+RecordingSshTool.class.getName(), + "services:", + "- type: " + VanillaSoftwareProcess.class.getName(), + " brooklyn.config:", + " onbox.base.dir.skipResolution: true", + " brooklyn.initializers:", + " - type: org.apache.brooklyn.core.sensor.ssh.SshCommandSensor", + " brooklyn.config:", + " name: mySensor", + " command: myCommand", + " executionDir: '/path/to/myexecutiondir'", + " shell.env:", + " MY_ENV: myEnvVal", + " period: 10ms", + " onlyIfServiceUp: false"); + + StartableApplication newApp = rebind(); + VanillaSoftwareProcess newEntity = (VanillaSoftwareProcess) Iterables.getOnlyElement(newApp.getChildren()); + SshFeed newFeed = (SshFeed) Iterables.find(((EntityInternal)newEntity).feeds().getFeeds(), Predicates.instanceOf(SshFeed.class)); + + // Clear history of commands, and the sensor, so can confirm it gets re-set by the ssh feed + RecordingSshTool.clearCmdHistory(); + newEntity.sensors().set(Sensors.newStringSensor("mySensor"), null); + + // Assert sensor is set, and command is executed as expected + EntityAsserts.assertAttributeEqualsEventually(newEntity, Sensors.newStringSensor("mySensor"), "myResponse"); + ExecCmd cmd = Asserts.succeedsEventually(() -> RecordingSshTool.getLastExecCmd()); + + assertTrue(cmd.commands.toString().contains("myCommand"), "cmds="+cmd.commands); + assertEquals(cmd.env.get("MY_ENV"), "myEnvVal", "env="+cmd.env); + assertTrue(cmd.commands.toString().contains("/path/to/myexecutiondir"), "cmds="+cmd.commands); + + // Confirm feed's memento is 'clean' - no anonymous inner classes + BrooklynMementoRawData rawMemento = loadMementoRawData(); + String rawFeedMemento = rawMemento.getFeeds().get(newFeed.getId()); + assertFalse(rawFeedMemento.contains("$1"), rawFeedMemento); + assertFalse(rawFeedMemento.contains("$2"), rawFeedMemento); + assertFalse(rawFeedMemento.contains("$3"), rawFeedMemento); + } +} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4901150/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java index b03b54f..b77dde3 100644 --- a/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/ssh/SshCommandSensor.java @@ -41,7 +41,9 @@ import org.apache.brooklyn.util.core.flags.TypeCoercions; import org.apache.brooklyn.util.core.json.ShellEnvironmentSerializer; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.guava.Functionals; import org.apache.brooklyn.util.os.Os; +import org.apache.brooklyn.util.text.StringFunctions; import org.apache.brooklyn.util.text.Strings; import org.apache.brooklyn.util.time.Duration; import org.slf4j.Logger; @@ -53,6 +55,7 @@ import com.google.common.base.Functions; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; +import com.google.common.reflect.TypeToken; /** * Configurable {@link EntityInitializer} which adds an SSH sensor feed running the <code>command</code> supplied @@ -99,7 +102,34 @@ public final class SshCommandSensor<T> extends AbstractAddSensorFeed<T> { final Duration logWarningGraceTimeOnStartup = EntityInitializers.resolve(params, LOG_WARNING_GRACE_TIME_ON_STARTUP); final Duration logWarningGraceTime = EntityInitializers.resolve(params, LOG_WARNING_GRACE_TIME); - Supplier<Map<String,String>> envSupplier = new Supplier<Map<String,String>>() { + Supplier<Map<String,String>> envSupplier = new EnvSupplier(entity, params); + + Supplier<String> commandSupplier = new CommandSupplier(entity, params); + + CommandPollConfig<T> pollConfig = new CommandPollConfig<T>(sensor) + .period(period) + .env(envSupplier) + .command(commandSupplier) + .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates)) + .checkSuccess(SshValueFunctions.exitStatusEquals(0)) + .onFailureOrException(Functions.constant((T)params.get(VALUE_ON_ERROR))) + .onSuccess(Functionals.chain( + SshValueFunctions.stdout(), + StringFunctions.trimEnd(), + TypeCoercions.function((Class<T>) sensor.getType()))) + .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup) + .logWarningGraceTime(logWarningGraceTime); + + SshFeed feed = SshFeed.builder() + .entity(entity) + .onlyIfServiceUp() + .poll(pollConfig) + .build(); + + entity.addFeed(feed); + + // Deprecated; kept for backwards compatibility with historic persisted state + new Supplier<Map<String,String>>() { @Override public Map<String, String> get() { if (entity == null) return ImmutableMap.of(); // See BROOKLYN-568 @@ -123,7 +153,8 @@ public final class SshCommandSensor<T> extends AbstractAddSensorFeed<T> { } }; - Supplier<String> commandSupplier = new Supplier<String>() { + // Deprecated; kept for backwards compatibility with historic persisted state + new Supplier<String>() { @Override public String get() { // Note that entity may be null during rebind (e.g. if this SshFeed is orphaned, with no associated entity): @@ -135,28 +166,12 @@ public final class SshCommandSensor<T> extends AbstractAddSensorFeed<T> { } }; - CommandPollConfig<T> pollConfig = new CommandPollConfig<T>(sensor) - .period(period) - .env(envSupplier) - .command(commandSupplier) - .suppressDuplicates(Boolean.TRUE.equals(suppressDuplicates)) - .checkSuccess(SshValueFunctions.exitStatusEquals(0)) - .onFailureOrException(Functions.constant((T)params.get(VALUE_ON_ERROR))) - .onSuccess(Functions.compose(new Function<String, T>() { - @Override - public T apply(String input) { - return TypeCoercions.coerce(Strings.trimEnd(input), (Class<T>) sensor.getType()); - }}, SshValueFunctions.stdout())) - .logWarningGraceTimeOnStartup(logWarningGraceTimeOnStartup) - .logWarningGraceTime(logWarningGraceTime); - - SshFeed feed = SshFeed.builder() - .entity(entity) - .onlyIfServiceUp() - .poll(pollConfig) - .build(); - - entity.addFeed(feed); + // Deprecated; kept for backwards compatibility with historic persisted state + new Function<String, T>() { + @Override public T apply(String input) { + return TypeCoercions.coerce(Strings.trimEnd(input), (Class<T>) sensor.getType()); + } + }; } @Beta @@ -183,4 +198,65 @@ public final class SshCommandSensor<T> extends AbstractAddSensorFeed<T> { return finalCommand; } + private static class EnvSupplier implements Supplier<Map<String,String>> { + private final Entity entity; + private final Object rawSensorShellEnv; + + EnvSupplier(Entity entity, ConfigBag params) { + this.entity = entity; + this.rawSensorShellEnv = params.getAllConfigRaw().getOrDefault(SENSOR_SHELL_ENVIRONMENT.getName(), SENSOR_SHELL_ENVIRONMENT.getDefaultValue()); + } + + @Override + public Map<String, String> get() { + if (entity == null) return ImmutableMap.of(); // See BROOKLYN-568 + + Map<String, Object> env = MutableMap.copyOf(entity.getConfig(BrooklynConfigKeys.SHELL_ENVIRONMENT)); + + // Add the shell environment entries from our configuration + if (rawSensorShellEnv != null) { + env.putAll(TypeCoercions.coerce(rawSensorShellEnv, new TypeToken<Map<String,Object>>() {})); + } + + // Try to resolve the configuration in the env Map + try { + env = (Map<String, Object>) Tasks.resolveDeepValue(env, Object.class, ((EntityInternal) entity).getExecutionContext()); + } catch (InterruptedException | ExecutionException e) { + Exceptions.propagateIfFatal(e); + } + + // Convert the environment into strings with the serializer + ShellEnvironmentSerializer serializer = new ShellEnvironmentSerializer(((EntityInternal) entity).getManagementContext()); + return serializer.serialize(env); + } + } + + private static class CommandSupplier implements Supplier<String> { + private final Entity entity; + private final Object rawSensorCommand; + private final Object rawSensorExecDir; + + CommandSupplier(Entity entity, ConfigBag params) { + this.entity = entity; + this.rawSensorCommand = params.getAllConfigRaw().get(SENSOR_COMMAND.getName()); + this.rawSensorExecDir = params.getAllConfigRaw().get(SENSOR_EXECUTION_DIR.getName()); + } + + @Override + public String get() { + // Note that entity may be null during rebind (e.g. if this SshFeed is orphaned, with no associated entity): + // See https://issues.apache.org/jira/browse/BROOKLYN-568. + // We therefore guard against null in makeCommandExecutingInDirectory. + ConfigBag params = ConfigBag.newInstance(); + if (rawSensorCommand != null) { + params.putStringKey(SENSOR_COMMAND.getName(), rawSensorCommand); + } + if (rawSensorExecDir != null) { + params.putStringKey(SENSOR_EXECUTION_DIR.getName(), rawSensorExecDir); + } + String command = Preconditions.checkNotNull(EntityInitializers.resolve(params, SENSOR_COMMAND)); + String dir = EntityInitializers.resolve(params, SENSOR_EXECUTION_DIR); + return makeCommandExecutingInDirectory(command, dir, entity); + } + } } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4901150/core/src/main/java/org/apache/brooklyn/util/core/flags/TypeCoercions.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/util/core/flags/TypeCoercions.java b/core/src/main/java/org/apache/brooklyn/util/core/flags/TypeCoercions.java index a2dc398..3b3f874 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/flags/TypeCoercions.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/flags/TypeCoercions.java @@ -28,7 +28,6 @@ import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.api.sensor.Sensor; import org.apache.brooklyn.core.internal.BrooklynInitialization; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; -import org.apache.brooklyn.core.mgmt.usage.UsageListener; import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.util.JavaGroovyEquivalents; import org.apache.brooklyn.util.core.ClassLoaderUtils; @@ -93,7 +92,19 @@ public class TypeCoercions { } public static <T> Function<Object, T> function(final Class<T> type) { - return coercer.function(type); + return new CoerceFunction<T>(type); + } + + private static class CoerceFunction<T> implements Function<Object, T> { + private final Class<T> type; + + public CoerceFunction(Class<T> type) { + this.type = type; + } + @Override + public T apply(Object input) { + return coerce(input, type); + } } public static void registerDeprecatedBrooklynAdapters() { http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4901150/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestFixture.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestFixture.java b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestFixture.java index b80d857..b09aba6 100644 --- a/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestFixture.java +++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/rebind/RebindTestFixture.java @@ -19,9 +19,11 @@ package org.apache.brooklyn.core.mgmt.rebind; import java.io.File; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.function.Function; import org.apache.brooklyn.api.entity.Application; import org.apache.brooklyn.api.mgmt.ManagementContext; @@ -30,6 +32,7 @@ import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode; import org.apache.brooklyn.api.mgmt.rebind.RebindExceptionHandler; import org.apache.brooklyn.api.mgmt.rebind.RebindManager; import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoManifest; +import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.entity.EntityPredicates; import org.apache.brooklyn.core.entity.StartableApplication; @@ -41,6 +44,7 @@ import org.apache.brooklyn.core.mgmt.persist.FileBasedObjectStore; import org.apache.brooklyn.core.mgmt.persist.PersistMode; import org.apache.brooklyn.core.server.BrooklynServerConfig; import org.apache.brooklyn.util.core.task.BasicExecutionManager; +import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.os.Os; import org.apache.brooklyn.util.osgi.OsgiTestResources; import org.apache.brooklyn.util.repeat.Repeater; @@ -299,6 +303,24 @@ public abstract class RebindTestFixture<T extends StartableApplication> { } protected BrooklynMementoManifest loadMementoManifest() throws Exception { + return loadFromPersistedState((persister) -> { + RebindExceptionHandler exceptionHandler = new RecordingRebindExceptionHandler(RebindManager.RebindFailureMode.FAIL_AT_END, RebindManager.RebindFailureMode.FAIL_AT_END); + try { + return persister.loadMementoManifest(null, exceptionHandler); + } catch (IOException e) { + throw Exceptions.propagate(e); + } + }); + } + + protected BrooklynMementoRawData loadMementoRawData() throws Exception { + return loadFromPersistedState((persister) -> { + RebindExceptionHandler exceptionHandler = new RecordingRebindExceptionHandler(RebindManager.RebindFailureMode.FAIL_AT_END, RebindManager.RebindFailureMode.FAIL_AT_END); + return persister.loadMementoRawData(exceptionHandler); + }); + } + + protected <U> U loadFromPersistedState(Function<BrooklynMementoPersisterToObjectStore, U> loader) throws Exception { newManagementContext = createNewManagementContext(); FileBasedObjectStore objectStore = new FileBasedObjectStore(mementoDir); objectStore.injectManagementContext(newManagementContext); @@ -307,10 +329,13 @@ public abstract class RebindTestFixture<T extends StartableApplication> { objectStore, newManagementContext, classLoader); - RebindExceptionHandler exceptionHandler = new RecordingRebindExceptionHandler(RebindManager.RebindFailureMode.FAIL_AT_END, RebindManager.RebindFailureMode.FAIL_AT_END); - BrooklynMementoManifest mementoManifest = persister.loadMementoManifest(null, exceptionHandler); - persister.stop(false); - return mementoManifest; + U result; + try { + result = loader.apply(persister); + } finally { + persister.stop(false); + } + return result; } // protected void assertCatalogContains(BrooklynCatalog catalog, CatalogItem<?, ?> item) { http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4901150/utils/common/src/main/java/org/apache/brooklyn/util/guava/Functionals.java ---------------------------------------------------------------------- diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/guava/Functionals.java b/utils/common/src/main/java/org/apache/brooklyn/util/guava/Functionals.java index 18238d2..ef2400b 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/util/guava/Functionals.java +++ b/utils/common/src/main/java/org/apache/brooklyn/util/guava/Functionals.java @@ -135,6 +135,9 @@ public class Functionals { } public static <T> Predicate<T> predicate(final Function<T,Boolean> f) { + // Deprecated use of anonymous class (even though it's got a name, it's in a method). + // Kept for rebinding to historic persisted state only. + @SuppressWarnings({ "unused", "hiding" }) class FunctionAsPredicate implements Predicate<T> { @Override public boolean apply(T input) { @@ -145,7 +148,23 @@ public class Functionals { return "predicate("+f+")"; } } - return new FunctionAsPredicate(); + return new Functionals.FunctionAsPredicate<T>(f); + } + + static class FunctionAsPredicate<T> implements Predicate<T> { + private final Function<T, Boolean> f; + + FunctionAsPredicate(final Function<T,Boolean> f) { + this.f = f; + } + @Override + public boolean apply(T input) { + return f.apply(input); + } + @Override + public String toString() { + return "predicate("+f+")"; + } } /** http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d4901150/utils/common/src/main/java/org/apache/brooklyn/util/text/StringFunctions.java ---------------------------------------------------------------------- diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/text/StringFunctions.java b/utils/common/src/main/java/org/apache/brooklyn/util/text/StringFunctions.java index ddf1914..13f32fc 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/util/text/StringFunctions.java +++ b/utils/common/src/main/java/org/apache/brooklyn/util/text/StringFunctions.java @@ -366,6 +366,18 @@ public class StringFunctions { } } + public static Function<String, String> trimEnd() { + return new TrimEndFunction(); + } + + protected static class TrimEndFunction implements Function<String, String> { + @Override + public String apply(@Nullable String input) { + if (input == null) return null; + return Strings.trimEnd(input); + } + } + public static Function<String, String> toLowerCase() { return new LowerCaseFunction(); }