Repository: incubator-brooklyn Updated Branches: refs/heads/master 83b27b320 -> e08d3208b
Updates to Riak for Clocker Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/d738df9a Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/d738df9a Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/d738df9a Branch: refs/heads/master Commit: d738df9a90ef73e5bece33fef97584b5d5d7dd4c Parents: 83b27b3 Author: Andrew Kennedy <[email protected]> Authored: Fri Mar 20 16:05:37 2015 +0000 Committer: Andrew Kennedy <[email protected]> Committed: Fri Mar 20 18:37:43 2015 +0000 ---------------------------------------------------------------------- .../entity/nosql/riak/RiakClusterImpl.java | 67 +++++------- .../brooklyn/entity/nosql/riak/RiakNode.java | 49 +++++---- .../entity/nosql/riak/RiakNodeDriver.java | 2 +- .../entity/nosql/riak/RiakNodeImpl.java | 9 +- .../entity/nosql/riak/RiakNodeSshDriver.java | 103 +++++++++++-------- .../nosql/riak/RiakClusterEc2LiveTest.java | 8 -- 6 files changed, 113 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java index ea1f1b9..079eada 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java @@ -24,8 +24,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import javax.annotation.Nullable; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +31,7 @@ import brooklyn.entity.Entity; import brooklyn.entity.basic.Attributes; import brooklyn.entity.basic.Entities; import brooklyn.entity.basic.EntityInternal; +import brooklyn.entity.basic.EntityPredicates; import brooklyn.entity.basic.Lifecycle; import brooklyn.entity.basic.ServiceStateLogic; import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic; @@ -47,7 +46,7 @@ import brooklyn.util.time.Time; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -73,14 +72,10 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER)); //FIXME: add a quorum to tolerate failed nodes before setting on fire. - Optional<Entity> anyNode = Iterables.tryFind(getMembers(), new Predicate<Entity>() { - - @Override - public boolean apply(@Nullable Entity entity) { - return (entity instanceof RiakNode && hasMemberJoinedCluster(entity) && entity.getAttribute(RiakNode.SERVICE_UP)); - } - }); - + Optional<Entity> anyNode = Iterables.tryFind(getMembers(), Predicates.and( + Predicates.instanceOf(RiakNode.class), + EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true), + EntityPredicates.attributeEqualTo(RiakNode.SERVICE_UP, true))); if (anyNode.isPresent()) { log.info("Planning and Committing cluster changes on node: {}, cluster: {}", anyNode.get().getId(), getId()); Entities.invokeEffector(this, anyNode.get(), RiakNode.COMMIT_RIAK_CLUSTER).blockUntilEnded(); @@ -103,7 +98,7 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { .configure("group", this)); } - protected void onServerPoolMemberChanged(Entity member) { + protected void onServerPoolMemberChanged(final Entity member) { synchronized (mutex) { log.trace("For {}, considering membership of {} which is in locations {}", new Object[]{ this, member, member.getLocations() }); @@ -132,50 +127,40 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { } else { // TODO: be wary of erroneous nodes but are still flagged 'in cluster' // add the new node to be part of the riak cluster. - Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() { - @Override - public boolean apply(@Nullable Entity node) { - return (node instanceof RiakNode && hasMemberJoinedCluster(node)); - } - }); - + Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and( + Predicates.instanceOf(RiakNode.class), + EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true))); if (anyNodeInCluster.isPresent()) { - if (!nodes.containsKey(member) && !hasMemberJoinedCluster(member)) { + if (!nodes.containsKey(member) && member.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER) == null) { String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME); Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName); if (getAttribute(IS_CLUSTER_INIT)) { - Entities.invokeEffector(RiakClusterImpl.this, anyNodeInCluster.get(), RiakNode.COMMIT_RIAK_CLUSTER); + Entities.invokeEffector(RiakClusterImpl.this, member, RiakNode.COMMIT_RIAK_CLUSTER); } nodes.put(member, riakName); setAttribute(RIAK_CLUSTER_NODES, nodes); - log.info("Adding riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) }); + log.info("Added Riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) }); } } else { - log.error("isFirstNodeSet , but no cluster members found to add {}", member.getId()); + log.error("isFirstNodeSet, but no cluster members found to add {}", member.getId()); } } } else { if (nodes != null && nodes.containsKey(member)) { - final Entity memberToBeRemoved = member; - - Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), new Predicate<Entity>() { - @Override - public boolean apply(@Nullable Entity node) { - return (node instanceof RiakNode && hasMemberJoinedCluster(node) && !node.equals(memberToBeRemoved)); - } - }); + Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and( + Predicates.instanceOf(RiakNode.class), + EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true), + Predicates.not(Predicates.equalTo(member)))); if (anyNodeInCluster.isPresent()) { - Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER, getRiakName(memberToBeRemoved)); + Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.LEAVE_RIAK_CLUSTER, getRiakName(member)); } - nodes.remove(member); setAttribute(RIAK_CLUSTER_NODES, nodes); - log.info("Removing riak node {}: {}; {} from cluster", new Object[]{this, member, getRiakName(member)}); + log.info("Removed Riak node {}: {}; {} from cluster", new Object[]{ this, member, getRiakName(member) }); } } ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyMap(this, RIAK_CLUSTER_NODES); - if (log.isTraceEnabled()) log.trace("Done {} checkEntity {}", this, member); calculateClusterAddresses(); } @@ -194,16 +179,14 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { protected boolean belongsInServerPool(Entity member) { if (!groovyTruth(member.getAttribute(Startable.SERVICE_UP))) { - if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, eliminating because not up", this, member); + log.trace("Members of {}, checking {}, eliminating because not up", this, member); return false; } if (!getMembers().contains(member)) { - if (log.isTraceEnabled()) - log.trace("Members of {}, checking {}, eliminating because not member", this, member); - + log.trace("Members of {}, checking {}, eliminating because not member", this, member); return false; } - if (log.isTraceEnabled()) log.trace("Members of {}, checking {}, approving", this, member); + log.trace("Members of {}, checking {}, approving", this, member); return true; } @@ -212,10 +195,6 @@ public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { return node.getAttribute(RiakNode.RIAK_NODE_NAME); } - private boolean hasMemberJoinedCluster(Entity member) { - return ((RiakNode) member).hasJoinedCluster(); - } - public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { @Override protected void onEntityEvent(EventType type, Entity entity) { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java index ef9556d..c1f1bf8 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java @@ -20,11 +20,6 @@ package brooklyn.entity.nosql.riak; import java.util.List; -import brooklyn.entity.basic.Attributes; -import brooklyn.event.basic.AttributeSensorAndConfigKey; -import brooklyn.event.basic.TemplatedStringAttributeSensorAndConfigKey; -import com.google.common.reflect.TypeToken; - import brooklyn.catalog.Catalog; import brooklyn.config.ConfigKey; import brooklyn.entity.annotation.Effector; @@ -34,10 +29,14 @@ import brooklyn.entity.basic.MethodEffector; import brooklyn.entity.basic.SoftwareProcess; import brooklyn.entity.proxying.ImplementedBy; import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.AttributeSensorAndConfigKey; import brooklyn.event.basic.PortAttributeSensorAndConfigKey; import brooklyn.event.basic.Sensors; +import brooklyn.event.basic.TemplatedStringAttributeSensorAndConfigKey; import brooklyn.util.flags.SetFromFlag; +import com.google.common.reflect.TypeToken; + @Catalog(name="Riak Node", description="Riak is a distributed NoSQL key-value data store that offers " + "extremely high availability, fault tolerance, operational simplicity and scalability.") @ImplementedBy(RiakNodeImpl.class) @@ -45,8 +44,10 @@ public interface RiakNode extends SoftwareProcess { @SetFromFlag("version") ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, - "Version to install. Example 2.0.2, 2.0.5", - "2.0.5"); + "Version to install (Default 2.0.5)", "2.0.5"); + + @SetFromFlag("optimizeNetworking") + ConfigKey<Boolean> OPTIMIZE_HOST_NETWORKING = ConfigKeys.newBooleanConfigKey("riak.networking.optimize", "Optimize host networking when running in a VM", Boolean.TRUE); // vm.args and app.config are used for pre-version 2.0.0. Later versions use the (simplified) riak.conf // see https://github.com/joedevivo/ricon/blob/master/cuttlefish.md @@ -94,14 +95,19 @@ public interface RiakNode extends SoftwareProcess { @SetFromFlag("riakWebPort") PortAttributeSensorAndConfigKey RIAK_WEB_PORT = new PortAttributeSensorAndConfigKey("riak.webPort", "Riak Web Port", "8098+"); - @SetFromFlag("riakNodeHasJoinedCluster") + @SetFromFlag("riakPbPort") + PortAttributeSensorAndConfigKey RIAK_PB_PORT = new PortAttributeSensorAndConfigKey("riak.pbPort", "Riak Protocol Buffers Port", "8087+"); + + AttributeSensor<Boolean> RIAK_PACKAGE_INSTALL = Sensors.newBooleanSensor( + "riak.install.package", "Flag to indicate whether Riak was installed using an OS package"); + AttributeSensor<Boolean> RIAK_ON_PATH = Sensors.newBooleanSensor( + "riak.install.onPath", "Flag to indicate whether Riak is available on the PATH"); + AttributeSensor<Boolean> RIAK_NODE_HAS_JOINED_CLUSTER = Sensors.newBooleanSensor( - "riak.node.riakNodeHasJoinedCluster", "Flag to indicate wether the Riak node has joined a cluster member"); + "riak.node.riakNodeHasJoinedCluster", "Flag to indicate whether the Riak node has joined a cluster member"); - @SetFromFlag("riakNodeName") AttributeSensor<String> RIAK_NODE_NAME = Sensors.newStringSensor("riak.node", "Returns the riak node name as defined in vm.args"); - @SetFromFlag("riakPbPort") - PortAttributeSensorAndConfigKey RIAK_PB_PORT = new PortAttributeSensorAndConfigKey("riak.pbPort", "Riak Protocol Buffers Port", "8087+"); + // these needed for nodes to talk to each other, but not clients (so ideally set up in the security group for internal access) PortAttributeSensorAndConfigKey HANDOFF_LISTENER_PORT = new PortAttributeSensorAndConfigKey("riak.handoffListenerPort", "Handoff Listener Port", "8099+"); PortAttributeSensorAndConfigKey EPMD_LISTENER_PORT = new PortAttributeSensorAndConfigKey("riak.epmdListenerPort", "Erlang Port Mapper Daemon Listener Port", "4369"); @@ -109,6 +115,7 @@ public interface RiakNode extends SoftwareProcess { PortAttributeSensorAndConfigKey ERLANG_PORT_RANGE_END = new PortAttributeSensorAndConfigKey("riak.erlangPortRangeEnd", "Erlang Port Range End", "7999+"); PortAttributeSensorAndConfigKey SEARCH_SOLR_PORT = new PortAttributeSensorAndConfigKey("riak.search.solr.port", "Solr port", "8093+"); PortAttributeSensorAndConfigKey SEARCH_SOLR_JMX_PORT = new PortAttributeSensorAndConfigKey("riak.search.solr.jmx_port", "Solr port", "8985+"); + AttributeSensor<Integer> NODE_GETS = Sensors.newIntegerSensor("node.gets"); AttributeSensor<Integer> NODE_GETS_TOTAL = Sensors.newIntegerSensor("node.gets.total"); AttributeSensor<Integer> NODE_PUTS = Sensors.newIntegerSensor("node.puts"); @@ -130,9 +137,10 @@ public interface RiakNode extends SoftwareProcess { @SuppressWarnings("serial") AttributeSensor<List<String>> RING_MEMBERS = Sensors.newSensor(new TypeToken<List<String>>() {}, "ring.members", "all the riak nodes in the ring"); - public static final MethodEffector<Void> JOIN_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "joinCluster"); - public static final MethodEffector<Void> LEAVE_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "leaveCluster"); - public static final MethodEffector<Void> COMMIT_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "commitCluster"); + + MethodEffector<Void> JOIN_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "joinCluster"); + MethodEffector<Void> LEAVE_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "leaveCluster"); + MethodEffector<Void> COMMIT_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "commitCluster"); // accessors, for use from template file Integer getRiakWebPort(); @@ -157,17 +165,16 @@ public interface RiakNode extends SoftwareProcess { String getOsMajorVersion(); - @Effector(description = "add this riak node to the riak cluster") + @Effector(description = "Add this riak node to the Riak cluster") public void joinCluster(@EffectorParam(name = "nodeName") String nodeName); - @Effector(description = "remove this riak node from the cluster") - public void leaveCluster(); + @Effector(description = "Remove this Riak node from the cluster") + public void leaveCluster(@EffectorParam(name = "nodeName") String nodeName); - @Effector(description = "recover a failed riak node and join it back to the cluster (by passing it a working node on the cluster 'node')") + @Effector(description = "Recover a failed Riak node and join it back to the cluster (by passing it a working node on the cluster 'node')") public void recoverFailedNode(@EffectorParam(name = "nodeName") String nodeName); - @Effector(description = "commit changes made to a Riak cluster") + @Effector(description = "Commit changes made to a Riak cluster") public void commitCluster(); - public boolean hasJoinedCluster(); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java index 3669c1c..b81b7fc 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java @@ -26,7 +26,7 @@ public interface RiakNodeDriver extends SoftwareProcessDriver { public void joinCluster(String nodeName); - public void leaveCluster(); + public void leaveCluster(String nodeName); public void recoverFailedNode(String nodeName); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java index 73bb272..0667a7a 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java @@ -181,8 +181,8 @@ public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode { } @Override - public void leaveCluster() { - getDriver().leaveCluster(); + public void leaveCluster(String nodeName) { + getDriver().leaveCluster(nodeName); } @Override @@ -191,11 +191,6 @@ public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode { } @Override - public boolean hasJoinedCluster() { - return Boolean.TRUE.equals(getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER)); - } - - @Override public void recoverFailedNode(String nodeName) { getDriver().recoverFailedNode(nodeName); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java index 00e304f..544c39a 100644 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java +++ b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java @@ -26,6 +26,7 @@ import java.util.Map; import brooklyn.util.ssh.BashCommands; import brooklyn.util.task.ssh.SshTasks; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +44,7 @@ import brooklyn.util.task.DynamicTasks; import brooklyn.util.text.Strings; import com.google.common.base.Joiner; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -53,8 +55,6 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen private static final Logger LOG = LoggerFactory.getLogger(RiakNodeSshDriver.class); private static final String sbinPath = "$PATH:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"; private static final String INSTALLING_FALLBACK = INSTALLING + "_fallback"; - private boolean isPackageInstall = true; - private boolean isRiakOnPath = true; public RiakNodeSshDriver(final RiakNodeImpl entity, final SshMachineLocation machine) { super(entity, machine); @@ -97,8 +97,9 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen } else { commands.addAll(installFromPackageCloud()); } + entity.setAttribute(RiakNode.RIAK_PACKAGE_INSTALL, true); } else if (osDetails.isMac()) { - isPackageInstall = false; + entity.setAttribute(RiakNode.RIAK_PACKAGE_INSTALL, false); commands.addAll(installMac()); } else if (osDetails.isWindows()) { throw new UnsupportedOperationException("RiakNode not supported on Windows instances"); @@ -123,6 +124,8 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen .execute(); } } + + checkRiakOnPath(); } private List<String> installLinuxFromPackageUrl(String expandedInstallDir) { @@ -217,8 +220,6 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen //create entity's runDir newScript(CUSTOMIZING).execute(); - isRiakOnPath = isPackageInstall ? isRiakOnPath() : true; - OsDetails osDetails = getMachine().getMachineDetails().getOsDetails(); List<String> commands = Lists.newLinkedList(); @@ -253,7 +254,21 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen commands.add(sudo("chown -R riak:riak " + getRiakEtcDir())); } - if(osDetails.isLinux()) { + // TODO platform_*_dir + // TODO riak config log + + ScriptHelper customizeScript = newScript(CUSTOMIZING) + .failOnNonZeroResultCode() + .body.append(commands); + + if (!isRiakOnPath()) { + Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath); + log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable); + customizeScript.environmentVariablesReset(newPathVariable); + } + customizeScript.failOnNonZeroResultCode().execute(); + + if (osDetails.isLinux()) { ImmutableMap<String, String> sysctl = ImmutableMap.<String, String>builder() .put("vm.swappiness", "0") .put("net.core.somaxconn", "40000") @@ -266,23 +281,14 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen .put("net.ipv4.tcp_moderate_rcvbuf", "1") .build(); - // TODO platform_*_dir - // TODO riak config log + ScriptHelper optimize = newScript(CUSTOMIZING + "network") + .body.append(sudo("sysctl " + Joiner.on(' ').withKeyValueSeparator("=").join(sysctl))); - commands.add( sudo("sysctl " + Joiner.on(' ').withKeyValueSeparator("=").join(sysctl))); + Optional<Boolean> enable = Optional.fromNullable(entity.getConfig(RiakNode.OPTIMIZE_HOST_NETWORKING)); + if (!enable.isPresent()) optimize.inessential(); + if (enable.or(true)) optimize.execute(); } - ScriptHelper customizeScript = newScript(CUSTOMIZING) - .failOnNonZeroResultCode() - .body.append(commands); - - if (!isRiakOnPath) { - Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath); - log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable); - customizeScript.environmentVariablesReset(newPathVariable); - } - customizeScript.failOnNonZeroResultCode().execute(); - //set the riak node name entity.setAttribute(RiakNode.RIAK_NODE_NAME, format("riak@%s", getSubnetHostname())); } @@ -290,7 +296,8 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen @Override public void launch() { List<String> commands = Lists.newLinkedList(); - if (isPackageInstall) { + + if (isPackageInstall()) { commands.add(addSbinPathCommand()); commands.add(sudo("service riak start")); } else { @@ -303,7 +310,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen ScriptHelper launchScript = newScript(LAUNCHING) .body.append(commands); - if (!isRiakOnPath) { + if (!isRiakOnPath()) { Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath); log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable); launchScript.environmentVariablesReset(newPathVariable); @@ -313,15 +320,15 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen @Override public void stop() { - leaveCluster(); + leaveCluster(""); String command = format("%s stop", getRiakCmd()); - command = isPackageInstall ? sudo(command) : command; + command = isPackageInstall() ? sudo(command) : command; ScriptHelper stopScript = newScript(ImmutableMap.of(USE_PID_FILE, false), STOPPING) .body.append(command); - if (!isRiakOnPath) { + if (!isRiakOnPath()) { Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath); log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable); stopScript.environmentVariablesReset(newPathVariable); @@ -339,7 +346,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen ScriptHelper checkRunningScript = newScript(CHECK_RUNNING) .body.append(sudo(format("%s ping", getRiakCmd()))); - if (!isRiakOnPath) { + if (!isRiakOnPath()) { Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath); log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable); checkRunningScript.environmentVariablesReset(newPathVariable); @@ -347,16 +354,24 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen return (checkRunningScript.execute() == 0); } + public boolean isPackageInstall() { + return entity.getAttribute(RiakNode.RIAK_PACKAGE_INSTALL); + } + + public boolean isRiakOnPath() { + return entity.getAttribute(RiakNode.RIAK_ON_PATH); + } + public String getRiakEtcDir() { - return isPackageInstall ? "/etc/riak" : Urls.mergePaths(getExpandedInstallDir(), "etc"); + return isPackageInstall() ? "/etc/riak" : Urls.mergePaths(getExpandedInstallDir(), "etc"); } protected String getRiakCmd() { - return isPackageInstall ? "riak" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak"); + return isPackageInstall() ? "riak" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak"); } protected String getRiakAdminCmd() { - return isPackageInstall ? "riak-admin" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak-admin"); + return isPackageInstall() ? "riak-admin" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak-admin"); } @Override @@ -372,7 +387,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen .body.append(sudo(format("%s cluster join %s", getRiakAdminCmd(), nodeName))) .failOnNonZeroResultCode(); - if (!isRiakOnPath) { + if (!isRiakOnPath()) { Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath); log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable); joinClusterScript.environmentVariablesReset(newPathVariable); @@ -388,19 +403,19 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen } @Override - public void leaveCluster() { + public void leaveCluster(String nodeName) { //TODO: add 'riak-admin cluster force-remove' for erroneous and unrecoverable nodes. //FIXME: find a way to batch commit the changes, instead of committing for every operation. //FIXME: find a way to check if the node is the last in the cluster to avoid removing the only member and getting "last node error" if (hasJoinedCluster()) { ScriptHelper leaveClusterScript = newScript("leaveCluster") - .body.append(sudo(format("%s cluster leave", getRiakAdminCmd()))) + .body.append(sudo(format("%s cluster leave %s", getRiakAdminCmd(), nodeName))) .body.append(sudo(format("%s cluster plan", getRiakAdminCmd()))) .body.append(sudo(format("%s cluster commit", getRiakAdminCmd()))) .failOnNonZeroResultCode(); - if (!isRiakOnPath) { + if (!isRiakOnPath()) { Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath); log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable); leaveClusterScript.environmentVariablesReset(newPathVariable); @@ -422,7 +437,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen .body.append(sudo(format("%s cluster commit", getRiakAdminCmd()))) .failOnNonZeroResultCode(); - if (!isRiakOnPath) { + if (!isRiakOnPath()) { Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath); log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable); commitClusterScript.environmentVariablesReset(newPathVariable); @@ -436,7 +451,6 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen @Override public void recoverFailedNode(String nodeName) { - //TODO find ways to detect a faulty/failed node //argument passed 'node' is any working node in the riak cluster //following the instruction from: http://docs.basho.com/riak/latest/ops/running/recovery/failed-node/ @@ -446,10 +460,10 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen String stopCommand = format("%s stop", getRiakCmd()); - stopCommand = isPackageInstall ? sudo(stopCommand) : stopCommand; + stopCommand = isPackageInstall() ? sudo(stopCommand) : stopCommand; - String startCommand = format("%s start >/dev/null 2>&1 < /dev/null &", getRiakCmd()); - startCommand = isPackageInstall ? sudo(startCommand) : startCommand; + String startCommand = format("%s start > /dev/null 2>&1 < /dev/null &", getRiakCmd()); + startCommand = isPackageInstall() ? sudo(startCommand) : startCommand; ScriptHelper recoverNodeScript = newScript("recoverNode") .body.append(stopCommand) @@ -461,7 +475,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen .body.append(sudo(format("%s cluster commit", getRiakAdminCmd()))) .failOnNonZeroResultCode(); - if (!isRiakOnPath) { + if (!isRiakOnPath()) { Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath); log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable); recoverNodeScript.environmentVariablesReset(newPathVariable); @@ -475,13 +489,14 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen } private Boolean hasJoinedCluster() { - return ((RiakNode) entity).hasJoinedCluster(); + return Boolean.TRUE.equals(entity.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER)); } - protected boolean isRiakOnPath() { - return (newScript("riakOnPath") + protected void checkRiakOnPath() { + boolean riakOnPath = newScript("riakOnPath") .body.append("which riak") - .execute() == 0); + .execute() == 0; + entity.setAttribute(RiakNode.RIAK_ON_PATH, riakOnPath); } private String getRiakName() { @@ -490,7 +505,7 @@ public class RiakNodeSshDriver extends AbstractSoftwareProcessSshDriver implemen private String getRingStateDir() { //TODO: check for non-package install. - return isPackageInstall ? "/var/lib/riak/ring" : Urls.mergePaths(getExpandedInstallDir(), "lib/ring"); + return isPackageInstall() ? "/var/lib/riak/ring" : Urls.mergePaths(getExpandedInstallDir(), "lib/ring"); } protected boolean isVersion1() { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d738df9a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java index ca0dd05..3f9e7d9 100644 --- a/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java +++ b/software/nosql/src/test/java/brooklyn/entity/nosql/riak/RiakClusterEc2LiveTest.java @@ -26,11 +26,8 @@ import brooklyn.entity.AbstractEc2LiveTest; import brooklyn.entity.basic.Attributes; import brooklyn.entity.proxying.EntitySpec; import brooklyn.location.Location; -import brooklyn.test.Asserts; import brooklyn.test.EntityTestUtils; -import com.google.common.base.Predicates; -import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -60,11 +57,6 @@ public class RiakClusterEc2LiveTest extends AbstractEc2LiveTest { for (final RiakNode node : nodes) { EntityTestUtils.assertAttributeEqualsEventually(node, RiakNode.SERVICE_UP, true); EntityTestUtils.assertAttributeEqualsEventually(node, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true); - Asserts.eventually(new Supplier<Boolean>() { - @Override public Boolean get() { - return node.hasJoinedCluster(); - } - }, Predicates.alwaysTrue()); } }
