http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java deleted file mode 100644 index 7b256c0..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakClusterImpl.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.riak; - -import static brooklyn.util.JavaGroovyEquivalents.groovyTruth; - -import java.net.URI; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.enricher.Enrichers; -import brooklyn.entity.Entity; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.EntityInternal; -import brooklyn.entity.basic.EntityPredicates; -import brooklyn.entity.basic.Lifecycle; -import brooklyn.entity.basic.ServiceStateLogic; -import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic; -import brooklyn.entity.group.AbstractMembershipTrackingPolicy; -import brooklyn.entity.group.DynamicClusterImpl; -import brooklyn.entity.proxying.EntitySpec; -import brooklyn.entity.trait.Startable; -import brooklyn.event.AttributeSensor; -import brooklyn.event.basic.DependentConfiguration; -import brooklyn.policy.EnricherSpec; -import brooklyn.policy.PolicySpec; -import brooklyn.util.task.Tasks; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -public class RiakClusterImpl extends DynamicClusterImpl implements RiakCluster { - - private static final Logger log = LoggerFactory.getLogger(RiakClusterImpl.class); - - private transient Object mutex = new Object[0]; - - public void init() { - super.init(); - log.info("Initializing the riak cluster..."); - setAttribute(IS_CLUSTER_INIT, false); - } - - @Override - protected void doStart() { - super.doStart(); - connectSensors(); - - try { - Duration delay = getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER); - Tasks.setBlockingDetails("Sleeping for "+delay+" before advertising cluster available"); - Time.sleep(delay); - } finally { - Tasks.resetBlockingDetails(); - } - - //FIXME: add a quorum to tolerate failed nodes before setting on fire. - @SuppressWarnings("unchecked") - Optional<Entity> anyNode = Iterables.tryFind(getMembers(), Predicates.and( - Predicates.instanceOf(RiakNode.class), - EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true), - EntityPredicates.attributeEqualTo(RiakNode.SERVICE_UP, true))); - if (anyNode.isPresent()) { - setAttribute(IS_CLUSTER_INIT, true); - } else { - log.warn("No Riak Nodes are found on the cluster: {}. Initialization Failed", getId()); - ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE); - } - } - - protected EntitySpec<?> getMemberSpec() { - EntitySpec<?> result = config().get(MEMBER_SPEC); - if (result!=null) return result; - return EntitySpec.create(RiakNode.class); - } - - protected void connectSensors() { - addPolicy(PolicySpec.create(MemberTrackingPolicy.class) - .displayName("Controller targets tracker") - .configure("sensorsToTrack", ImmutableSet.of(RiakNode.SERVICE_UP)) - .configure("group", this)); - - EnricherSpec<?> first = Enrichers.builder() - .aggregating(Attributes.MAIN_URI) - .publishing(Attributes.MAIN_URI) - .computing(new Function<Collection<URI>,URI>() { - @Override - public URI apply(Collection<URI> input) { - return input.iterator().next(); - } }) - .fromMembers() - .build(); - addEnricher(first); - - Map<? extends AttributeSensor<? extends Number>, ? extends AttributeSensor<? extends Number>> enricherSetup = - ImmutableMap.<AttributeSensor<? extends Number>, AttributeSensor<? extends Number>>builder() - .put(RiakNode.NODE_PUTS, RiakCluster.NODE_PUTS_1MIN_PER_NODE) - .put(RiakNode.NODE_GETS, RiakCluster.NODE_GETS_1MIN_PER_NODE) - .put(RiakNode.NODE_OPS, RiakCluster.NODE_OPS_1MIN_PER_NODE) - .build(); - // construct sum and average over cluster - for (AttributeSensor<? extends Number> nodeSensor : enricherSetup.keySet()) { - addSummingMemberEnricher(nodeSensor); - addAveragingMemberEnricher(nodeSensor, enricherSetup.get(nodeSensor)); - } - } - - private void addAveragingMemberEnricher(AttributeSensor<? extends Number> fromSensor, AttributeSensor<? extends Number> toSensor) { - addEnricher(Enrichers.builder() - .aggregating(fromSensor) - .publishing(toSensor) - .fromMembers() - .computingAverage() - .build() - ); - } - - private void addSummingMemberEnricher(AttributeSensor<? extends Number> source) { - addEnricher(Enrichers.builder() - .aggregating(source) - .publishing(source) - .fromMembers() - .computingSum() - .build() - ); - } - - protected void onServerPoolMemberChanged(final Entity member) { - synchronized (mutex) { - log.trace("For {}, considering membership of {} which is in locations {}", new Object[]{ this, member, member.getLocations() }); - - Map<Entity, String> nodes = getAttribute(RIAK_CLUSTER_NODES); - if (belongsInServerPool(member)) { - // TODO can we discover the nodes by asking the riak cluster, rather than assuming what we add will be in there? - // TODO and can we do join as part of node starting? - - if (nodes == null) { - nodes = Maps.newLinkedHashMap(); - } - String riakName = getRiakName(member); - Preconditions.checkNotNull(riakName); - - // flag a first node to be the first node in the riak cluster. - Boolean firstNode = getAttribute(IS_FIRST_NODE_SET); - if (!Boolean.TRUE.equals(firstNode)) { - setAttribute(IS_FIRST_NODE_SET, Boolean.TRUE); - - nodes.put(member, riakName); - setAttribute(RIAK_CLUSTER_NODES, nodes); - - ((EntityInternal) member).setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE); - - log.info("Added initial Riak node {}: {}; {} to new cluster", new Object[] { this, member, getRiakName(member) }); - } else { - // TODO: be wary of erroneous nodes but are still flagged 'in cluster' - // add the new node to be part of the riak cluster. - Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and( - Predicates.instanceOf(RiakNode.class), - EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true))); - if (anyNodeInCluster.isPresent()) { - if (!nodes.containsKey(member) && member.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER) == null) { - String anyNodeName = anyNodeInCluster.get().getAttribute(RiakNode.RIAK_NODE_NAME); - Entities.invokeEffectorWithArgs(this, member, RiakNode.JOIN_RIAK_CLUSTER, anyNodeName).blockUntilEnded(); - nodes.put(member, riakName); - setAttribute(RIAK_CLUSTER_NODES, nodes); - log.info("Added Riak node {}: {}; {} to cluster", new Object[] { this, member, getRiakName(member) }); - } - } else { - log.error("isFirstNodeSet, but no cluster members found to add {}", member.getId()); - } - } - } else { - if (nodes != null && nodes.containsKey(member)) { - DependentConfiguration.attributeWhenReady(member, RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Predicates.equalTo(false)).blockUntilEnded(Duration.TWO_MINUTES); - @SuppressWarnings("unchecked") - Optional<Entity> anyNodeInCluster = Iterables.tryFind(nodes.keySet(), Predicates.and( - Predicates.instanceOf(RiakNode.class), - EntityPredicates.attributeEqualTo(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, true), - Predicates.not(Predicates.equalTo(member)))); - if (anyNodeInCluster.isPresent()) { - Entities.invokeEffectorWithArgs(this, anyNodeInCluster.get(), RiakNode.REMOVE_FROM_CLUSTER, getRiakName(member)).blockUntilEnded(); - } - nodes.remove(member); - setAttribute(RIAK_CLUSTER_NODES, nodes); - log.info("Removed Riak node {}: {}; {} from cluster", new Object[]{ this, member, getRiakName(member) }); - } - } - - ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyMap(this, RIAK_CLUSTER_NODES); - - calculateClusterAddresses(); - } - } - - private void calculateClusterAddresses() { - List<String> addresses = Lists.newArrayList(); - List<String> addressesPbPort = Lists.newArrayList(); - for (Entity entity : this.getMembers()) { - if (entity instanceof RiakNode && entity.getAttribute(Attributes.SERVICE_UP)) { - RiakNode riakNode = (RiakNode) entity; - addresses.add(riakNode.getAttribute(Attributes.SUBNET_HOSTNAME) + ":" + riakNode.getAttribute(RiakNode.RIAK_WEB_PORT)); - addressesPbPort.add(riakNode.getAttribute(Attributes.SUBNET_HOSTNAME) + ":" + riakNode.getAttribute(RiakNode.RIAK_PB_PORT)); - } - } - setAttribute(RiakCluster.NODE_LIST, Joiner.on(",").join(addresses)); - setAttribute(RiakCluster.NODE_LIST_PB_PORT, Joiner.on(",").join(addressesPbPort)); - } - - protected boolean belongsInServerPool(Entity member) { - if (!groovyTruth(member.getAttribute(Startable.SERVICE_UP))) { - log.trace("Members of {}, checking {}, eliminating because not up", this, member); - return false; - } - if (!getMembers().contains(member)) { - log.trace("Members of {}, checking {}, eliminating because not member", this, member); - return false; - } - log.trace("Members of {}, checking {}, approving", this, member); - - return true; - } - - private String getRiakName(Entity node) { - return node.getAttribute(RiakNode.RIAK_NODE_NAME); - } - - public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { - @Override - protected void onEntityEvent(EventType type, Entity entity) { - ((RiakClusterImpl) super.entity).onServerPoolMemberChanged(entity); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java deleted file mode 100644 index fa2bbbb..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNode.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.riak; - -import java.net.URI; -import java.util.List; - -import org.apache.brooklyn.catalog.Catalog; -import brooklyn.config.ConfigKey; -import brooklyn.entity.annotation.Effector; -import brooklyn.entity.annotation.EffectorParam; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.MethodEffector; -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.java.UsesJava; -import brooklyn.entity.proxying.ImplementedBy; -import brooklyn.event.AttributeSensor; -import brooklyn.event.basic.AttributeSensorAndConfigKey; -import brooklyn.event.basic.PortAttributeSensorAndConfigKey; -import brooklyn.event.basic.Sensors; -import brooklyn.util.flags.SetFromFlag; - -import com.google.common.collect.ImmutableList; -import com.google.common.reflect.TypeToken; - -@Catalog(name="Riak Node", description="Riak is a distributed NoSQL key-value data store that offers " - + "extremely high availability, fault tolerance, operational simplicity and scalability.") -@ImplementedBy(RiakNodeImpl.class) -public interface RiakNode extends SoftwareProcess, UsesJava { - - @SetFromFlag("version") - ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, - "Version to install (Default 2.0.5)", "2.0.5"); - - @SetFromFlag("optimizeNetworking") - ConfigKey<Boolean> OPTIMIZE_HOST_NETWORKING = ConfigKeys.newBooleanConfigKey("riak.networking.optimize", "Optimize host networking when running in a VM", Boolean.TRUE); - - // vm.args and app.config are used for pre-version 2.0.0. Later versions use the (simplified) riak.conf - // see https://github.com/joedevivo/ricon/blob/master/cuttlefish.md - @SetFromFlag("vmArgsTemplateUrl") - ConfigKey<String> RIAK_VM_ARGS_TEMPLATE_URL = ConfigKeys.newStringConfigKey( - "riak.vmArgs.templateUrl", "Template file (in freemarker format) for the vm.args config file", - "classpath://brooklyn/entity/nosql/riak/vm.args"); - @SetFromFlag("appConfigTemplateUrl") - ConfigKey<String> RIAK_APP_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey( - "riak.appConfig.templateUrl", "Template file (in freemarker format) for the app.config config file", - "classpath://brooklyn/entity/nosql/riak/app.config"); - @SetFromFlag("appConfigTemplateUrlLinux") - ConfigKey<String> RIAK_CONF_TEMPLATE_URL_LINUX = ConfigKeys.newStringConfigKey( - "riak.riakConf.templateUrl.linux", "Template file (in freemarker format) for the app.config config file", - "classpath://brooklyn/entity/nosql/riak/riak.conf"); - @SetFromFlag("appConfigTemplateUrlMac") - ConfigKey<String> RIAK_CONF_TEMPLATE_URL_MAC = ConfigKeys.newStringConfigKey( - "riak.riakConf.templateUrl.mac", "Template file (in freemarker format) for the app.config config file", - "classpath://brooklyn/entity/nosql/riak/riak-mac.conf"); - - ConfigKey<String> RIAK_CONF_ADDITIONAL_CONTENT = ConfigKeys.newStringConfigKey( - "riak.riakConf.additionalContent", "Template file (in freemarker format) for setting up additional settings in the riak.conf file", ""); - - // maxOpenFiles' default value (65536) is based on the Basho's recommendation - http://docs.basho.com/riak/latest/ops/tuning/open-files-limit/ - @SetFromFlag("maxOpenFiles") - ConfigKey<Integer> RIAK_MAX_OPEN_FILES = ConfigKeys.newIntegerConfigKey( - "riak.max.open.files", "Number of the open files required by Riak", 65536); - - @SetFromFlag("downloadUrlRhelCentos") - AttributeSensorAndConfigKey<String, String> DOWNLOAD_URL_RHEL_CENTOS = ConfigKeys.newTemplateSensorAndConfigKey("download.url.rhelcentos", - "URL pattern for downloading the linux RPM installer (will substitute things like ${version} automatically)", - "http://s3.amazonaws.com/downloads.basho.com/riak/${entity.majorVersion}/${entity.fullVersion}/rhel/" + - "${entity.osMajorVersion}/riak-${entity.fullVersion}-1.el${entity.osMajorVersion}.x86_64.rpm"); - - @SetFromFlag("downloadUrlUbuntu") - AttributeSensorAndConfigKey<String, String> DOWNLOAD_URL_UBUNTU = ConfigKeys.newTemplateSensorAndConfigKey("download.url.ubuntu", - "URL pattern for downloading the linux Ubuntu installer (will substitute things like ${version} automatically)", - "http://s3.amazonaws.com/downloads.basho.com/riak/${entity.majorVersion}/${entity.fullVersion}/ubuntu/" + - "$OS_RELEASE/riak_${entity.fullVersion}-1_amd64.deb"); - - @SetFromFlag("downloadUrlDebian") - AttributeSensorAndConfigKey<String, String> DOWNLOAD_URL_DEBIAN = ConfigKeys.newTemplateSensorAndConfigKey("download.url.debian", - "URL pattern for downloading the linux Debian installer (will substitute things like ${version} automatically)", - "http://s3.amazonaws.com/downloads.basho.com/riak/${entity.majorVersion}/${entity.fullVersion}/debian/" + - "$OS_RELEASE/riak_${entity.fullVersion}-1_amd64.deb"); - - @SetFromFlag("downloadUrlMac") - AttributeSensorAndConfigKey<String, String> DOWNLOAD_URL_MAC = ConfigKeys.newTemplateSensorAndConfigKey("download.url.mac", - "URL pattern for downloading the MAC binaries tarball (will substitute things like ${version} automatically)", - "http://s3.amazonaws.com/downloads.basho.com/riak/${entity.majorVersion}/${entity.fullVersion}/osx/10.8/riak-${entity.fullVersion}-OSX-x86_64.tar.gz"); - - // NB these two needed for clients to access - @SetFromFlag("riakWebPort") - PortAttributeSensorAndConfigKey RIAK_WEB_PORT = new PortAttributeSensorAndConfigKey("riak.webPort", "Riak Web Port", "8098+"); - - @SetFromFlag("riakPbPort") - PortAttributeSensorAndConfigKey RIAK_PB_PORT = new PortAttributeSensorAndConfigKey("riak.pbPort", "Riak Protocol Buffers Port", "8087+"); - - AttributeSensor<Boolean> RIAK_PACKAGE_INSTALL = Sensors.newBooleanSensor( - "riak.install.package", "Flag to indicate whether Riak was installed using an OS package"); - AttributeSensor<Boolean> RIAK_ON_PATH = Sensors.newBooleanSensor( - "riak.install.onPath", "Flag to indicate whether Riak is available on the PATH"); - - AttributeSensor<Boolean> RIAK_NODE_HAS_JOINED_CLUSTER = Sensors.newBooleanSensor( - "riak.node.riakNodeHasJoinedCluster", "Flag to indicate whether the Riak node has joined a cluster member"); - - AttributeSensor<String> RIAK_NODE_NAME = Sensors.newStringSensor("riak.node", "Returns the riak node name as defined in vm.args"); - - // these needed for nodes to talk to each other, but not clients (so ideally set up in the security group for internal access) - PortAttributeSensorAndConfigKey HANDOFF_LISTENER_PORT = new PortAttributeSensorAndConfigKey("handoffListenerPort", "Handoff Listener Port", "8099+"); - PortAttributeSensorAndConfigKey EPMD_LISTENER_PORT = new PortAttributeSensorAndConfigKey("epmdListenerPort", "Erlang Port Mapper Daemon Listener Port", "4369"); - PortAttributeSensorAndConfigKey ERLANG_PORT_RANGE_START = new PortAttributeSensorAndConfigKey("erlangPortRangeStart", "Erlang Port Range Start", "6000+"); - PortAttributeSensorAndConfigKey ERLANG_PORT_RANGE_END = new PortAttributeSensorAndConfigKey("erlangPortRangeEnd", "Erlang Port Range End", "7999+"); - - @SetFromFlag("searchEnabled") - ConfigKey<Boolean> SEARCH_ENABLED = ConfigKeys.newBooleanConfigKey("riak.search", "Deploy Solr and configure Riak to use it", false); - - /** - * http://docs.basho.com/riak/latest/dev/using/search/ - * Solr is powered by Riak's Yokozuna engine and it is used through the riak webport - * So SEARCH_SOLR_PORT shouldn't be exposed - */ - ConfigKey<Integer> SEARCH_SOLR_PORT = ConfigKeys.newIntegerConfigKey("search.solr.port", "Solr port", 8983); - ConfigKey<Integer> SEARCH_SOLR_JMX_PORT = ConfigKeys.newIntegerConfigKey("search.solr.jmx_port", "Solr port", 8985); - - AttributeSensor<Integer> NODE_GETS = Sensors.newIntegerSensor("riak.node.gets", "Gets in the last minute"); - AttributeSensor<Integer> NODE_GETS_TOTAL = Sensors.newIntegerSensor("riak.node.gets.total", "Total gets since node started"); - AttributeSensor<Integer> NODE_PUTS = Sensors.newIntegerSensor("riak.node.puts", "Puts in the last minute"); - AttributeSensor<Integer> NODE_PUTS_TOTAL = Sensors.newIntegerSensor("riak.node.puts.total", "Total puts since node started"); - AttributeSensor<Integer> VNODE_GETS = Sensors.newIntegerSensor("riak.vnode.gets"); - AttributeSensor<Integer> VNODE_GETS_TOTAL = Sensors.newIntegerSensor("riak.vnode.gets.total"); - - //Sensors for Riak Node Counters (within 1 minute window or lifetime of node. - //http://docs.basho.com/riak/latest/ops/running/stats-and-monitoring/#Statistics-from-Riak - AttributeSensor<Integer> VNODE_PUTS = Sensors.newIntegerSensor("riak.vnode.puts"); - AttributeSensor<Integer> VNODE_PUTS_TOTAL = Sensors.newIntegerSensor("riak.vnode.puts.total"); - AttributeSensor<Integer> READ_REPAIRS_TOTAL = Sensors.newIntegerSensor("riak.read.repairs.total"); - AttributeSensor<Integer> COORD_REDIRS_TOTAL = Sensors.newIntegerSensor("riak.coord.redirs.total"); - //Additional Riak node counters - AttributeSensor<Integer> MEMORY_PROCESSES_USED = Sensors.newIntegerSensor("riak.memory.processes.used"); - AttributeSensor<Integer> SYS_PROCESS_COUNT = Sensors.newIntegerSensor("riak.sys.process.count"); - AttributeSensor<Integer> PBC_CONNECTS = Sensors.newIntegerSensor("riak.pbc.connects"); - AttributeSensor<Integer> PBC_ACTIVE = Sensors.newIntegerSensor("riak.pbc.active"); - @SuppressWarnings("serial") - AttributeSensor<List<String>> RING_MEMBERS = Sensors.newSensor(new TypeToken<List<String>>() {}, - "ring.members", "all the riak nodes in the ring"); - - AttributeSensor<Integer> NODE_OPS = Sensors.newIntegerSensor("riak.node.ops", "Sum of node gets and puts in the last minute"); - AttributeSensor<Integer> NODE_OPS_TOTAL = Sensors.newIntegerSensor("riak.node.ops.total", "Sum of node gets and puts since the node started"); - - MethodEffector<Void> JOIN_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "joinCluster"); - MethodEffector<Void> LEAVE_RIAK_CLUSTER = new MethodEffector<Void>(RiakNode.class, "leaveCluster"); - MethodEffector<Void> REMOVE_FROM_CLUSTER = new MethodEffector<Void>(RiakNode.class, "removeNode"); - - AttributeSensor<Integer> RIAK_NODE_GET_FSM_TIME_MEAN = Sensors.newIntegerSensor("riak.node_get_fsm_time_mean", "Time between reception of client read request and subsequent response to client"); - AttributeSensor<Integer> RIAK_NODE_PUT_FSM_TIME_MEAN = Sensors.newIntegerSensor("riak.node_put_fsm_time_mean", "Time between reception of client write request and subsequent response to client"); - AttributeSensor<Integer> RIAK_OBJECT_COUNTER_MERGE_TIME_MEAN = Sensors.newIntegerSensor("riak.object_counter_merge_time_mean", "Time it takes to perform an Update Counter operation"); - AttributeSensor<Integer> RIAK_OBJECT_SET_MERGE_TIME_MEAN = Sensors.newIntegerSensor("riak.object_set_merge_time_mean", "Time it takes to perform an Update Set operation"); - AttributeSensor<Integer> RIAK_OBJECT_MAP_MERGE_TIME_MEAN = Sensors.newIntegerSensor("riak.object_map_merge_time_mean", "Time it takes to perform an Update Map operation"); - AttributeSensor<Integer> RIAK_CONSISTENT_GET_TIME_MEAN = Sensors.newIntegerSensor("riak.consistent_get_time_mean", "Strongly consistent read latency"); - AttributeSensor<Integer> RIAK_CONSISTENT_PUT_TIME_MEAN = Sensors.newIntegerSensor("riak.consistent_put_time_mean", "Strongly consistent write latency"); - - List<AttributeSensor<Integer>> ONE_MINUTE_SENSORS = ImmutableList.of(RIAK_NODE_GET_FSM_TIME_MEAN, RIAK_NODE_PUT_FSM_TIME_MEAN, - RIAK_OBJECT_COUNTER_MERGE_TIME_MEAN, RIAK_OBJECT_SET_MERGE_TIME_MEAN, RIAK_OBJECT_MAP_MERGE_TIME_MEAN, - RIAK_CONSISTENT_GET_TIME_MEAN, RIAK_CONSISTENT_PUT_TIME_MEAN); - - AttributeSensor<URI> RIAK_CONSOLE_URI = Attributes.MAIN_URI; - - // accessors, for use from template file - Integer getRiakWebPort(); - - Integer getRiakPbPort(); - - Integer getHandoffListenerPort(); - - Integer getEpmdListenerPort(); - - Integer getErlangPortRangeStart(); - - Integer getErlangPortRangeEnd(); - - Boolean isSearchEnabled(); - - Integer getSearchSolrPort(); - - Integer getSearchSolrJmxPort(); - - String getFullVersion(); - - String getMajorVersion(); - - String getOsMajorVersion(); - - // TODO add commitCluster() effector and add effectors joinCluster, leaveCluster, removeNode, recoverFailedNode which do not execute commitCluster() - // the commit where the commitCluster effector was available is adbf2dc1cb5df98b1e52d3ab35fa6bb4983b722f - - @Effector(description = "Join the Riak cluster on the given node") - void joinCluster(@EffectorParam(name = "nodeName") String nodeName); - - @Effector(description = "Leave the Riak cluster") - void leaveCluster(); - - @Effector(description = "Remove the given node from the Riak cluster") - void removeNode(@EffectorParam(name = "nodeName") String nodeName); - - @Effector(description = "Recover and join the Riak cluster on the given node") - void recoverFailedNode(@EffectorParam(name = "nodeName") String nodeName); - - @Effector(description = "Create or modify a bucket type before activation") - void bucketTypeCreate(@EffectorParam(name = "bucketTypeName") String bucketTypeName, - @EffectorParam(name = "bucketTypeProperties") String bucketTypeProperties); - - @Effector(description = "List all currently available bucket types and their activation status") - List<String> bucketTypeList(); - - @Effector(description = "Display the status and properties of a specific bucket type") - List<String> bucketTypeStatus(@EffectorParam(name = "bucketTypeName") String bucketTypeName); - - @Effector(description = "Update a bucket type after activation") - void bucketTypeUpdate(@EffectorParam(name = "bucketTypeName") String bucketTypeName, - @EffectorParam(name = "bucketTypeProperties") String bucketTypeProperties); - - @Effector(description = "Activate a bucket type") - void bucketTypeActivate(@EffectorParam(name = "bucketTypeName") String bucketTypeName); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java deleted file mode 100644 index 5fca3cc..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeDriver.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.riak; - -import brooklyn.entity.basic.SoftwareProcessDriver; - -import java.util.List; - -public interface RiakNodeDriver extends SoftwareProcessDriver { - - String getRiakEtcDir(); - - void joinCluster(String nodeName); - - void leaveCluster(); - - void removeNode(String nodeName); - - void recoverFailedNode(String nodeName); - - String getOsMajorVersion(); - - void bucketTypeCreate(String bucketTypeName, String bucketTypeProperties); - - List<String> bucketTypeList(); - - List<String> bucketTypeStatus(String bucketTypeName); - - void bucketTypeUpdate(String bucketTypeName, String bucketTypeProperties); - - void bucketTypeActivate(String bucketTypeName); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java deleted file mode 100644 index 7dda317..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeImpl.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.riak; - -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import javax.annotation.Nullable; - -import brooklyn.enricher.Enrichers; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.SoftwareProcessImpl; -import brooklyn.entity.webapp.WebAppServiceMethods; -import brooklyn.event.AttributeSensor; -import brooklyn.event.basic.AttributeSensorAndConfigKey; -import brooklyn.event.feed.http.HttpFeed; -import brooklyn.event.feed.http.HttpPollConfig; -import brooklyn.event.feed.http.HttpValueFunctions; -import brooklyn.location.MachineProvisioningLocation; -import brooklyn.location.access.BrooklynAccessUtils; -import brooklyn.location.cloud.CloudLocationConfig; -import brooklyn.util.collections.MutableSet; -import brooklyn.util.config.ConfigBag; -import brooklyn.util.guava.Functionals; -import brooklyn.util.time.Duration; - -import com.google.common.base.Preconditions; -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.collect.ContiguousSet; -import com.google.common.collect.DiscreteDomain; -import com.google.common.collect.Range; -import com.google.common.net.HostAndPort; - -public class RiakNodeImpl extends SoftwareProcessImpl implements RiakNode { - - private volatile HttpFeed httpFeed; - - @Override - public RiakNodeDriver getDriver() { - return (RiakNodeDriver) super.getDriver(); - } - - @Override - public Class<RiakNodeDriver> getDriverInterface() { - return RiakNodeDriver.class; - } - - @Override - public void init() { - super.init(); - // fail fast if config files not avail - Entities.getRequiredUrlConfig(this, RIAK_VM_ARGS_TEMPLATE_URL); - Entities.getRequiredUrlConfig(this, RIAK_APP_CONFIG_TEMPLATE_URL); - - Integer defaultMaxOpenFiles = RIAK_MAX_OPEN_FILES.getDefaultValue(); - Integer maxOpenFiles = getConfig(RiakNode.RIAK_MAX_OPEN_FILES); - Preconditions.checkArgument(maxOpenFiles >= defaultMaxOpenFiles , "Specified number of open files : %s : is less than the required minimum", - maxOpenFiles, defaultMaxOpenFiles); - } - - @SuppressWarnings("rawtypes") - public boolean isPackageDownloadUrlProvided() { - AttributeSensorAndConfigKey[] downloadProperties = { DOWNLOAD_URL_RHEL_CENTOS, DOWNLOAD_URL_UBUNTU, DOWNLOAD_URL_DEBIAN }; - for (AttributeSensorAndConfigKey property : downloadProperties) { - if (!((ConfigurationSupportInternal) config()).getRaw(property).isAbsent()) { - return true; - } - } - return false; - } - - @Override - protected Map<String, Object> obtainProvisioningFlags(@SuppressWarnings("rawtypes") MachineProvisioningLocation location) { - ConfigBag result = ConfigBag.newInstance(super.obtainProvisioningFlags(location)); - result.configure(CloudLocationConfig.OS_64_BIT, true); - return result.getAllConfig(); - } - - @Override - protected Collection<Integer> getRequiredOpenPorts() { - // TODO this creates a huge list of inbound ports; much better to define on a security group using range syntax! - int erlangRangeStart = getConfig(ERLANG_PORT_RANGE_START).iterator().next(); - int erlangRangeEnd = getConfig(ERLANG_PORT_RANGE_END).iterator().next(); - - Set<Integer> ports = MutableSet.copyOf(super.getRequiredOpenPorts()); - Set<Integer> erlangPorts = ContiguousSet.create(Range.open(erlangRangeStart, erlangRangeEnd), DiscreteDomain.integers()); - ports.addAll(erlangPorts); - - return ports; - } - - @Override - public void connectSensors() { - super.connectSensors(); - connectServiceUpIsRunning(); - HostAndPort accessible = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getRiakWebPort()); - - HttpFeed.Builder httpFeedBuilder = HttpFeed.builder() - .entity(this) - .period(500, TimeUnit.MILLISECONDS) - .baseUri(String.format("http://%s/stats", accessible.toString())) - .poll(new HttpPollConfig<Integer>(NODE_GETS) - .onSuccess(HttpValueFunctions.jsonContents("node_gets", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(NODE_GETS_TOTAL) - .onSuccess(HttpValueFunctions.jsonContents("node_gets_total", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(NODE_PUTS) - .onSuccess(HttpValueFunctions.jsonContents("node_puts", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(NODE_PUTS_TOTAL) - .onSuccess(HttpValueFunctions.jsonContents("node_puts_total", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(VNODE_GETS) - .onSuccess(HttpValueFunctions.jsonContents("vnode_gets", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(VNODE_GETS_TOTAL) - .onSuccess(HttpValueFunctions.jsonContents("vnode_gets_total", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(VNODE_PUTS) - .onSuccess(HttpValueFunctions.jsonContents("vnode_puts", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(VNODE_PUTS_TOTAL) - .onSuccess(HttpValueFunctions.jsonContents("vnode_puts_total", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(READ_REPAIRS_TOTAL) - .onSuccess(HttpValueFunctions.jsonContents("read_repairs_total", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(COORD_REDIRS_TOTAL) - .onSuccess(HttpValueFunctions.jsonContents("coord_redirs_total", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(MEMORY_PROCESSES_USED) - .onSuccess(HttpValueFunctions.jsonContents("memory_processes_used", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(SYS_PROCESS_COUNT) - .onSuccess(HttpValueFunctions.jsonContents("sys_process_count", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(PBC_CONNECTS) - .onSuccess(HttpValueFunctions.jsonContents("pbc_connects", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<Integer>(PBC_ACTIVE) - .onSuccess(HttpValueFunctions.jsonContents("pbc_active", Integer.class)) - .onFailureOrException(Functions.constant(-1))) - .poll(new HttpPollConfig<List<String>>(RING_MEMBERS) - .onSuccess(Functionals.chain( - HttpValueFunctions.jsonContents("ring_members", String[].class), - new Function<String[], List<String>>() { - @Nullable - @Override - public List<String> apply(@Nullable String[] strings) { - return Arrays.asList(strings); - } - } - )) - .onFailureOrException(Functions.constant(Arrays.asList(new String[0])))); - - for (AttributeSensor<Integer> sensor : ONE_MINUTE_SENSORS) { - httpFeedBuilder.poll(new HttpPollConfig<Integer>(sensor) - .period(Duration.ONE_MINUTE) - .onSuccess(HttpValueFunctions.jsonContents(sensor.getName().substring(5), Integer.class)) - .onFailureOrException(Functions.constant(-1))); - } - - httpFeed = httpFeedBuilder.build(); - - addEnricher(Enrichers.builder().combining(NODE_GETS, NODE_PUTS).computingSum().publishing(NODE_OPS).build()); - addEnricher(Enrichers.builder().combining(NODE_GETS_TOTAL, NODE_PUTS_TOTAL).computingSum().publishing(NODE_OPS_TOTAL).build()); - WebAppServiceMethods.connectWebAppServerPolicies(this); - } - - @Override - public void disconnectSensors() { - super.disconnectSensors(); - if (httpFeed != null) { - httpFeed.stop(); - } - disconnectServiceUpIsRunning(); - } - - @Override - public void joinCluster(String nodeName) { - getDriver().joinCluster(nodeName); - } - - @Override - public void leaveCluster() { - getDriver().leaveCluster(); - } - - @Override - public void removeNode(String nodeName) { - getDriver().removeNode(nodeName); - } - - @Override - public void bucketTypeCreate(String bucketTypeName, String bucketTypeProperties) { - getDriver().bucketTypeCreate(bucketTypeName, bucketTypeProperties); - } - - @Override - public List<String> bucketTypeList() { - return getDriver().bucketTypeList(); - } - - @Override - public List<String> bucketTypeStatus(String bucketTypeName) { - return getDriver().bucketTypeStatus(bucketTypeName); - } - - @Override - public void bucketTypeUpdate(String bucketTypeName, String bucketTypeProperties) { - getDriver().bucketTypeUpdate(bucketTypeName, bucketTypeProperties); - } - - @Override - public void bucketTypeActivate(String bucketTypeName) { - getDriver().bucketTypeActivate(bucketTypeName); - } - - @Override - public void recoverFailedNode(String nodeName) { - getDriver().recoverFailedNode(nodeName); - } - - @Override - public Integer getRiakWebPort() { - return getAttribute(RiakNode.RIAK_WEB_PORT); - } - - @Override - public Integer getRiakPbPort() { - return getAttribute(RiakNode.RIAK_PB_PORT); - } - - @Override - public Integer getHandoffListenerPort() { - return getAttribute(RiakNode.HANDOFF_LISTENER_PORT); - } - - @Override - public Integer getEpmdListenerPort() { - return getAttribute(RiakNode.EPMD_LISTENER_PORT); - } - - @Override - public Integer getErlangPortRangeStart() { - return getAttribute(RiakNode.ERLANG_PORT_RANGE_START); - } - - @Override - public Integer getErlangPortRangeEnd() { - return getAttribute(RiakNode.ERLANG_PORT_RANGE_END); - } - - @Override - public Boolean isSearchEnabled() { - return getConfig(RiakNode.SEARCH_ENABLED); - } - - @Override - public Integer getSearchSolrPort() { - return getConfig(RiakNode.SEARCH_SOLR_PORT); - } - - @Override - public Integer getSearchSolrJmxPort() { - return getConfig(RiakNode.SEARCH_SOLR_JMX_PORT); - } - - @Override - public String getMajorVersion() { - return getFullVersion().substring(0, 3); - } - - @Override - public String getFullVersion() { - return getConfig(RiakNode.SUGGESTED_VERSION); - } - - @Override - public String getOsMajorVersion() { - return getDriver().getOsMajorVersion(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java deleted file mode 100644 index 7ad15d7..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/riak/RiakNodeSshDriver.java +++ /dev/null @@ -1,614 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.riak; - -import static brooklyn.util.ssh.BashCommands.INSTALL_CURL; -import static brooklyn.util.ssh.BashCommands.INSTALL_TAR; -import static brooklyn.util.ssh.BashCommands.addSbinPathCommand; -import static brooklyn.util.ssh.BashCommands.alternatives; -import static brooklyn.util.ssh.BashCommands.chainGroup; -import static brooklyn.util.ssh.BashCommands.commandToDownloadUrlAs; -import static brooklyn.util.ssh.BashCommands.ifExecutableElse; -import static brooklyn.util.ssh.BashCommands.ifNotExecutable; -import static brooklyn.util.ssh.BashCommands.ok; -import static brooklyn.util.ssh.BashCommands.sudo; -import static brooklyn.util.text.StringEscapes.BashStringEscapes.escapeLiteralForDoubleQuotedBash; -import static java.lang.String.format; - -import java.net.URI; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import brooklyn.entity.java.JavaSoftwareProcessSshDriver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.lifecycle.ScriptHelper; -import brooklyn.entity.software.SshEffectorTasks; -import brooklyn.location.OsDetails; -import brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.net.Urls; -import brooklyn.util.os.Os; -import brooklyn.util.ssh.BashCommands; -import brooklyn.util.task.DynamicTasks; -import brooklyn.util.task.ssh.SshTasks; -import brooklyn.util.text.Strings; - -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; - -// TODO: Alter -env ERL_CRASH_DUMP path in vm.args -public class RiakNodeSshDriver extends JavaSoftwareProcessSshDriver implements RiakNodeDriver { - - private static final Logger LOG = LoggerFactory.getLogger(RiakNodeSshDriver.class); - private static final String sbinPath = "$PATH:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"; - private static final String INSTALLING_FALLBACK = INSTALLING + "_fallback"; - - public RiakNodeSshDriver(final RiakNodeImpl entity, final SshMachineLocation machine) { - super(entity, machine); - } - - @Override - protected String getLogFileLocation() { - return "/var/log/riak/solr.log"; - } - - @Override - public RiakNodeImpl getEntity() { - return RiakNodeImpl.class.cast(super.getEntity()); - } - - @Override - public Map<String, String> getShellEnvironment() { - MutableMap<String, String> result = MutableMap.copyOf(super.getShellEnvironment()); - // how to change epmd port, according to - // http://serverfault.com/questions/582787/how-to-change-listening-interface-of-rabbitmqs-epmd-port-4369 - if (getEntity().getEpmdListenerPort() != null) { - result.put("ERL_EPMD_PORT", Integer.toString(getEntity().getEpmdListenerPort())); - } - result.put("WAIT_FOR_ERLANG", "60"); - return result; - } - - @Override - public void preInstall() { - resolver = Entities.newDownloader(this); - setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("riak-%s", getVersion())))); - - // Set package install attribute - OsDetails osDetails = getMachine().getMachineDetails().getOsDetails(); - if (osDetails.isLinux()) { - entity.setAttribute(RiakNode.RIAK_PACKAGE_INSTALL, true); - } else if (osDetails.isMac()) { - entity.setAttribute(RiakNode.RIAK_PACKAGE_INSTALL, false); - } - } - - @Override - public void install() { - if (entity.getConfig(Attributes.DOWNLOAD_URL) != null) { - LOG.warn("Ignoring download.url {}, use download.url.rhelcentos or download.url.mac", entity.getConfig(Attributes.DOWNLOAD_URL)); - } - - OsDetails osDetails = getMachine().getMachineDetails().getOsDetails(); - List<String> commands = Lists.newLinkedList(); - if (osDetails.isLinux()) { - if (getEntity().isPackageDownloadUrlProvided()) { - commands.addAll(installLinuxFromPackageUrl()); - } else { - commands.addAll(installFromPackageCloud()); - } - } else if (osDetails.isMac()) { - commands.addAll(installMac()); - } else if (osDetails.isWindows()) { - throw new UnsupportedOperationException("RiakNode not supported on Windows instances"); - } else { - throw new IllegalStateException("Machine was not detected as linux, mac or windows! Installation does not know how to proceed with " + - getMachine() + ". Details: " + getMachine().getMachineDetails().getOsDetails()); - } - - int result = newScript(INSTALLING) - .body.append(commands) - .failIfBodyEmpty() - .execute(); - - if (result != 0 && osDetails.isLinux()) { - result = newScript(INSTALLING_FALLBACK) - .body.append(installLinuxFromPackageUrl()) - .execute(); - } - - if (result != 0) { - throw new IllegalStateException(String.format("Install failed with result %d", result)); - } - } - - private List<String> installLinuxFromPackageUrl() { - DynamicTasks.queueIfPossible(SshTasks.dontRequireTtyForSudo(getMachine(), SshTasks.OnFailingTask.WARN_OR_IF_DYNAMIC_FAIL_MARKING_INESSENTIAL)).orSubmitAndBlock(); - - String expandedInstallDir = getExpandedInstallDir(); - String installBin = Urls.mergePaths(expandedInstallDir, "bin"); - String saveAsYum = "riak.rpm"; - String saveAsApt = "riak.deb"; - OsDetails osDetails = getMachine().getOsDetails(); - - String downloadUrl; - String osReleaseCmd; - if ("debian".equalsIgnoreCase(osDetails.getName())) { - // TODO osDetails.getName() is returning "linux", instead of debian/ubuntu on AWS with jenkins image, - // running as integration test targetting localhost. - // TODO Debian support (default debian image fails with 'sudo: command not found') - downloadUrl = (String)entity.getAttribute(RiakNode.DOWNLOAD_URL_DEBIAN); - osReleaseCmd = osDetails.getVersion().substring(0, osDetails.getVersion().indexOf(".")); - } else { - // assume Ubuntu - downloadUrl = (String)entity.getAttribute(RiakNode.DOWNLOAD_URL_UBUNTU); - osReleaseCmd = "`lsb_release -sc` && " + - "export OS_RELEASE=`([[ \"lucid natty precise\" =~ (^| )\\$OS_RELEASE($| ) ]] && echo $OS_RELEASE || echo precise)`"; - } - String apt = chainGroup( - //debian fix - "export PATH=" + sbinPath, - "which apt-get", - ok(sudo("apt-get -y --allow-unauthenticated install logrotate libpam0g-dev libssl0.9.8")), - "export OS_NAME=" + Strings.toLowerCase(osDetails.getName()), - "export OS_RELEASE=" + osReleaseCmd, - String.format("wget -O %s %s", saveAsApt, downloadUrl), - sudo(String.format("dpkg -i %s", saveAsApt))); - String yum = chainGroup( - "which yum", - ok(sudo("yum -y install openssl")), - String.format("wget -O %s %s", saveAsYum, entity.getAttribute(RiakNode.DOWNLOAD_URL_RHEL_CENTOS)), - sudo(String.format("yum localinstall -y %s", saveAsYum))); - return ImmutableList.<String>builder() - .add("mkdir -p " + installBin) - .add(INSTALL_CURL) - .add(alternatives(apt, yum)) - .add("ln -s `which riak` " + Urls.mergePaths(installBin, "riak")) - .add("ln -s `which riak-admin` " + Urls.mergePaths(installBin, "riak-admin")) - .build(); - } - - private List<String> installFromPackageCloud() { - OsDetails osDetails = getMachine().getMachineDetails().getOsDetails(); - return ImmutableList.<String>builder() - .add(osDetails.getName().toLowerCase().contains("debian") ? addSbinPathCommand() : "") - .add(ifNotExecutable("curl", INSTALL_CURL)) - .addAll(ifExecutableElse("yum", installDebianBased(), installRpmBased())) - .build(); - } - - private ImmutableList<String> installDebianBased() { - return ImmutableList.<String>builder() - .add("curl https://packagecloud.io/install/repositories/basho/riak/script.deb.sh | " + BashCommands.sudo("bash")) - .add(BashCommands.sudo("apt-get install --assume-yes riak=" + getEntity().getFullVersion() + "-1")) - .build(); - } - - private ImmutableList<String> installRpmBased() { - return ImmutableList.<String>builder() - .add("curl https://packagecloud.io/install/repositories/basho/riak/script.rpm.sh | " + BashCommands.sudo("bash")) - .add(BashCommands.sudo("yum install -y riak-" + getEntity().getFullVersion() + "*")) - .build(); - } - - protected List<String> installMac() { - String saveAs = resolver.getFilename(); - String url = entity.getAttribute(RiakNode.DOWNLOAD_URL_MAC); - return ImmutableList.<String>builder() - .add(INSTALL_TAR) - .add(INSTALL_CURL) - .add(commandToDownloadUrlAs(url, saveAs)) - .add("tar xzvf " + saveAs) - .build(); - } - - @Override - public void customize() { - checkRiakOnPath(); - - //create entity's runDir - newScript(CUSTOMIZING).execute(); - - OsDetails osDetails = getMachine().getMachineDetails().getOsDetails(); - - List<String> commands = Lists.newLinkedList(); - commands.add(sudo("mkdir -p " + getRiakEtcDir())); - - if (isVersion1()) { - String vmArgsTemplate = processTemplate(entity.getConfig(RiakNode.RIAK_VM_ARGS_TEMPLATE_URL)); - String saveAsVmArgs = Urls.mergePaths(getRunDir(), "vm.args"); - DynamicTasks.queue(SshEffectorTasks.put(saveAsVmArgs).contents(vmArgsTemplate)); - commands.add(sudo("mv " + saveAsVmArgs + " " + getRiakEtcDir())); - - String appConfigTemplate = processTemplate(entity.getConfig(RiakNode.RIAK_APP_CONFIG_TEMPLATE_URL)); - String saveAsAppConfig = Urls.mergePaths(getRunDir(), "app.config"); - DynamicTasks.queue(SshEffectorTasks.put(saveAsAppConfig).contents(appConfigTemplate)); - commands.add(sudo("mv " + saveAsAppConfig + " " + getRiakEtcDir())); - } else { - String templateUrl = osDetails.isMac() ? entity.getConfig(RiakNode.RIAK_CONF_TEMPLATE_URL_MAC) : - entity.getConfig(RiakNode.RIAK_CONF_TEMPLATE_URL_LINUX); - String riakConfContent = processTemplate(templateUrl); - String saveAsRiakConf = Urls.mergePaths(getRunDir(), "riak.conf"); - - if(Strings.isNonBlank(entity.getConfig(RiakNode.RIAK_CONF_ADDITIONAL_CONTENT))) { - String additionalConfigContent = processTemplateContents(entity.getConfig(RiakNode.RIAK_CONF_ADDITIONAL_CONTENT)); - riakConfContent += "\n## Brooklyn note: additional config\n"; - riakConfContent += additionalConfigContent; - } - - DynamicTasks.queue(SshEffectorTasks.put(saveAsRiakConf).contents(riakConfContent)); - commands.add(sudo("mv " + saveAsRiakConf + " " + getRiakEtcDir())); - } - - //increase open file limit (default min for riak is: 4096) - //TODO: detect the actual limit then do the modification. - //TODO: modify ulimit for linux distros - // commands.add(sudo("launchctl limit maxfiles 4096 32768")); - if (osDetails.isMac()) { - commands.add("ulimit -n 4096"); - } - - if (osDetails.isLinux() && isVersion1()) { - commands.add(sudo("chown -R riak:riak " + getRiakEtcDir())); - } - - // TODO platform_*_dir - // TODO riak config log - - ScriptHelper customizeScript = newScript(CUSTOMIZING) - .failOnNonZeroResultCode() - .body.append(commands); - - if (!isRiakOnPath()) { - addRiakOnPath(customizeScript); - } - customizeScript.failOnNonZeroResultCode().execute(); - - if (osDetails.isLinux()) { - ImmutableMap<String, String> sysctl = ImmutableMap.<String, String>builder() - .put("vm.swappiness", "0") - .put("net.core.somaxconn", "40000") - .put("net.ipv4.tcp_max_syn_backlog", "40000") - .put("net.ipv4.tcp_sack", "1") - .put("net.ipv4.tcp_window_scaling", "15") - .put("net.ipv4.tcp_fin_timeout", "1") - .put("net.ipv4.tcp_keepalive_intvl", "30") - .put("net.ipv4.tcp_tw_reuse", "1") - .put("net.ipv4.tcp_moderate_rcvbuf", "1") - .build(); - - ScriptHelper optimize = newScript(CUSTOMIZING + "network") - .body.append(sudo("sysctl " + Joiner.on(' ').withKeyValueSeparator("=").join(sysctl))); - - Optional<Boolean> enable = Optional.fromNullable(entity.getConfig(RiakNode.OPTIMIZE_HOST_NETWORKING)); - if (!enable.isPresent()) optimize.inessential(); - if (enable.or(true)) optimize.execute(); - } - - //set the riak node name - entity.setAttribute(RiakNode.RIAK_NODE_NAME, format("riak@%s", getSubnetHostname())); - } - - @Override - public void launch() { - List<String> commands = Lists.newLinkedList(); - - if (isPackageInstall()) { - commands.add(addSbinPathCommand()); - commands.add(sudo(format("sh -c \"ulimit -n %s && service riak start\"", maxOpenFiles()))); - } else { - // NOTE: See instructions at http://superuser.com/questions/433746/is-there-a-fix-for-the-too-many-open-files-in-system-error-on-os-x-10-7-1 - // for increasing the system limit for number of open files - commands.add("ulimit -n 65536 || true"); // `BashCommands.ok` will put this in parentheses, which will set ulimit -n in the subshell - commands.add(format("%s start >/dev/null 2>&1 < /dev/null &", getRiakCmd())); - } - - ScriptHelper launchScript = newScript(LAUNCHING) - .body.append(commands); - - if (!isRiakOnPath()) { - addRiakOnPath(launchScript); - } - launchScript.failOnNonZeroResultCode().execute(); - - String mainUri = String.format("http://%s:%s/admin", entity.getAttribute(Attributes.HOSTNAME), entity.getAttribute(RiakNode.RIAK_WEB_PORT)); - entity.setAttribute(Attributes.MAIN_URI, URI.create(mainUri)); - } - - @Override - public void stop() { - leaveCluster(); - - String command = format("%s stop", getRiakCmd()); - command = isPackageInstall() ? sudo(command) : command; - - ScriptHelper stopScript = newScript(ImmutableMap.of(USE_PID_FILE, false), STOPPING) - .body.append(command); - - if (!isRiakOnPath()) { - addRiakOnPath(stopScript); - } - - int result = stopScript.failOnNonZeroResultCode().execute(); - if (result != 0) { - newScript(ImmutableMap.of(USE_PID_FILE, false), STOPPING).execute(); - } - } - - @Override - public boolean isRunning() { - // Version 2.0.0 requires sudo for `riak ping` - ScriptHelper checkRunningScript = newScript(CHECK_RUNNING) - .body.append(sudo(format("%s ping", getRiakCmd()))); - - if (!isRiakOnPath()) { - addRiakOnPath(checkRunningScript); - } - return (checkRunningScript.execute() == 0); - } - - public boolean isPackageInstall() { - return entity.getAttribute(RiakNode.RIAK_PACKAGE_INSTALL); - } - - public boolean isRiakOnPath() { - return entity.getAttribute(RiakNode.RIAK_ON_PATH); - } - - public String getRiakEtcDir() { - return isPackageInstall() ? "/etc/riak" : Urls.mergePaths(getExpandedInstallDir(), "etc"); - } - - protected String getRiakCmd() { - return isPackageInstall() ? "riak" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak"); - } - - protected String getRiakAdminCmd() { - return isPackageInstall() ? "riak-admin" : Urls.mergePaths(getExpandedInstallDir(), "bin/riak-admin"); - } - - // TODO find a way to batch commit the changes, instead of committing for every operation. - - @Override - public void joinCluster(String nodeName) { - if (getRiakName().equals(nodeName)) { - log.warn("Cannot join Riak node: {} to itself", nodeName); - } else { - if (!hasJoinedCluster()) { - ScriptHelper joinClusterScript = newScript("joinCluster") - .body.append(sudo(format("%s cluster join %s", getRiakAdminCmd(), nodeName))) - .body.append(sudo(format("%s cluster plan", getRiakAdminCmd()))) - .body.append(sudo(format("%s cluster commit", getRiakAdminCmd()))) - .failOnNonZeroResultCode(); - - if (!isRiakOnPath()) { - addRiakOnPath(joinClusterScript); - } - - joinClusterScript.execute(); - - entity.setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.TRUE); - } else { - log.warn("entity {}: is already in the riak cluster", entity.getId()); - } - } - } - - @Override - public void leaveCluster() { - if (hasJoinedCluster()) { - ScriptHelper leaveClusterScript = newScript("leaveCluster") - .body.append(sudo(format("%s cluster leave", getRiakAdminCmd()))) - .body.append(sudo(format("%s cluster plan", getRiakAdminCmd()))) - .body.append(sudo(format("%s cluster commit", getRiakAdminCmd()))) - .failOnNonZeroResultCode(); - - if (!isRiakOnPath()) { - addRiakOnPath(leaveClusterScript); - } - - leaveClusterScript.execute(); - - entity.setAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER, Boolean.FALSE); - } else { - log.warn("entity {}: has already left the riak cluster", entity.getId()); - } - } - - @Override - public void removeNode(String nodeName) { - ScriptHelper removeNodeScript = newScript("removeNode") - .body.append(sudo(format("%s cluster force-remove %s", getRiakAdminCmd(), nodeName))) - .body.append(sudo(format("%s down %s", getRiakAdminCmd(), nodeName))) - .body.append(sudo(format("%s cluster plan", getRiakAdminCmd()))) - .body.append(sudo(format("%s cluster commit", getRiakAdminCmd()))) - .failOnNonZeroResultCode(); - - if (!isRiakOnPath()) { - addRiakOnPath(removeNodeScript); - } - - removeNodeScript.execute(); - } - - @Override - public void bucketTypeCreate(String bucketTypeName, String bucketTypeProperties) { - ScriptHelper bucketTypeCreateScript = newScript("bucket-type_create " + bucketTypeName) - .body.append(sudo(format("%s bucket-type create %s %s", - getRiakAdminCmd(), - bucketTypeName, - escapeLiteralForDoubleQuotedBash(bucketTypeProperties)))); - if(!isRiakOnPath()) { - addRiakOnPath(bucketTypeCreateScript); - } - bucketTypeCreateScript.body.append(sudo(format("%s bucket-type activate %s", getRiakAdminCmd(), bucketTypeName))) - .failOnNonZeroResultCode(); - - bucketTypeCreateScript.execute(); - } - - @Override - public List<String> bucketTypeList() { - ScriptHelper bucketTypeListScript = newScript("bucket-types_list") - .body.append(sudo(format("%s bucket-type list", getRiakAdminCmd()))) - .gatherOutput() - .noExtraOutput() - .failOnNonZeroResultCode(); - if (!isRiakOnPath()) { - addRiakOnPath(bucketTypeListScript); - } - bucketTypeListScript.execute(); - String stdout = bucketTypeListScript.getResultStdout(); - return Arrays.asList(stdout.split("[\\r\\n]+")); - } - - @Override - public List<String> bucketTypeStatus(String bucketTypeName) { - ScriptHelper bucketTypeStatusScript = newScript("bucket-type_status") - .body.append(sudo(format("%s bucket-type status %s", getRiakAdminCmd(), bucketTypeName))) - .gatherOutput() - .noExtraOutput() - .failOnNonZeroResultCode(); - if (!isRiakOnPath()) { - addRiakOnPath(bucketTypeStatusScript); - } - bucketTypeStatusScript.execute(); - String stdout = bucketTypeStatusScript.getResultStdout(); - return Arrays.asList(stdout.split("[\\r\\n]+")); - } - - @Override - public void bucketTypeUpdate(String bucketTypeName, String bucketTypeProperties) { - ScriptHelper bucketTypeStatusScript = newScript("bucket-type_update") - .body.append(sudo(format("%s bucket-type update %s %s", - getRiakAdminCmd(), - bucketTypeName, - escapeLiteralForDoubleQuotedBash(bucketTypeProperties)))) - .failOnNonZeroResultCode(); - if (!isRiakOnPath()) { - addRiakOnPath(bucketTypeStatusScript); - } - bucketTypeStatusScript.execute(); - } - - @Override - public void bucketTypeActivate(String bucketTypeName) { - ScriptHelper bucketTypeStatusScript = newScript("bucket-type_activate") - .body.append(sudo(format("%s bucket-type activate %s", getRiakAdminCmd(), bucketTypeName))) - .failOnNonZeroResultCode(); - if (!isRiakOnPath()) { - addRiakOnPath(bucketTypeStatusScript); - } - bucketTypeStatusScript.execute(); - } - - @Override - public void recoverFailedNode(String nodeName) { - //TODO find ways to detect a faulty/failed node - //argument passed 'node' is any working node in the riak cluster - //following the instruction from: http://docs.basho.com/riak/latest/ops/running/recovery/failed-node/ - - if (hasJoinedCluster()) { - String failedNodeName = getRiakName(); - - - String stopCommand = format("%s stop", getRiakCmd()); - stopCommand = isPackageInstall() ? sudo(stopCommand) : stopCommand; - - String startCommand = format("%s start > /dev/null 2>&1 < /dev/null &", getRiakCmd()); - startCommand = isPackageInstall() ? sudo(startCommand) : startCommand; - - ScriptHelper recoverNodeScript = newScript("recoverNode") - .body.append(stopCommand) - .body.append(format("%s down %s", getRiakAdminCmd(), failedNodeName)) - .body.append(sudo(format("rm -rf %s", getRingStateDir()))) - .body.append(startCommand) - .body.append(sudo(format("%s cluster join %s", getRiakAdminCmd(), nodeName))) - .body.append(sudo(format("%s cluster plan", getRiakAdminCmd()))) - .body.append(sudo(format("%s cluster commit", getRiakAdminCmd()))) - .failOnNonZeroResultCode(); - - if (!isRiakOnPath()) { - addRiakOnPath(recoverNodeScript); - } - - recoverNodeScript.execute(); - - } else { - log.warn("entity {}: is not in the riak cluster", entity.getId()); - } - } - - @Override - public void setup() { - if(entity.getConfig(RiakNode.SEARCH_ENABLED)) { - // JavaSoftwareProcessSshDriver.setup() is called in order to install java - super.setup(); - } - } - - private Boolean hasJoinedCluster() { - return Boolean.TRUE.equals(entity.getAttribute(RiakNode.RIAK_NODE_HAS_JOINED_CLUSTER)); - } - - protected void checkRiakOnPath() { - boolean riakOnPath = newScript("riakOnPath") - .body.append("which riak") - .execute() == 0; - entity.setAttribute(RiakNode.RIAK_ON_PATH, riakOnPath); - } - - private String getRiakName() { - return entity.getAttribute(RiakNode.RIAK_NODE_NAME); - } - - private String getRingStateDir() { - //TODO: check for non-package install. - return isPackageInstall() ? "/var/lib/riak/ring" : Urls.mergePaths(getExpandedInstallDir(), "lib/ring"); - } - - protected boolean isVersion1() { - return getVersion().startsWith("1."); - } - - @Override - public String getOsMajorVersion() { - OsDetails osDetails = getMachine().getMachineDetails().getOsDetails(); - String osVersion = osDetails.getVersion(); - return osVersion.contains(".") ? osVersion.substring(0, osVersion.indexOf(".")) : osVersion; - } - - private void addRiakOnPath(ScriptHelper scriptHelper) { - Map<String, String> newPathVariable = ImmutableMap.of("PATH", sbinPath); -// log.warn("riak command not found on PATH. Altering future commands' environment variables from {} to {}", getShellEnvironment(), newPathVariable); - scriptHelper.environmentVariablesReset(newPathVariable); - } - - public Integer maxOpenFiles() { - return entity.getConfig(RiakNode.RIAK_MAX_OPEN_FILES); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServer.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServer.java b/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServer.java deleted file mode 100644 index fc8d28e..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServer.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.solr; - -import java.util.Map; - -import org.apache.brooklyn.catalog.Catalog; -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.BrooklynConfigKeys; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.java.UsesJava; -import brooklyn.entity.java.UsesJavaMXBeans; -import brooklyn.entity.java.UsesJmx; -import brooklyn.entity.proxying.ImplementedBy; -import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; -import brooklyn.event.basic.PortAttributeSensorAndConfigKey; -import brooklyn.location.basic.PortRanges; -import brooklyn.util.flags.SetFromFlag; -import brooklyn.util.time.Duration; - -import com.google.common.collect.Maps; -import com.google.common.reflect.TypeToken; - -/** - * An {@link brooklyn.entity.Entity} that represents a Solr node. - */ -@Catalog(name="Apache Solr Node", description="Solr is the popular, blazing fast open source enterprise search " + - "platform from the Apache Lucene project.", iconUrl="classpath:///solr-logo.jpeg") -@ImplementedBy(SolrServerImpl.class) -public interface SolrServer extends SoftwareProcess, UsesJava, UsesJmx, UsesJavaMXBeans { - - @SetFromFlag("version") - ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "4.7.2"); - - @SetFromFlag("downloadUrl") - BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( - SoftwareProcess.DOWNLOAD_URL, "${driver.mirrorUrl}/${version}/solr-${version}.tgz"); - - /** download mirror, if desired */ - @SetFromFlag("mirrorUrl") - ConfigKey<String> MIRROR_URL = ConfigKeys.newStringConfigKey("solr.install.mirror.url", "URL of mirror", - "http://mirrors.ukfast.co.uk/sites/ftp.apache.org/lucene/solr/"); - - @SetFromFlag("solrPort") - PortAttributeSensorAndConfigKey SOLR_PORT = new PortAttributeSensorAndConfigKey("solr.http.port", "Solr HTTP communications port", - PortRanges.fromString("8983+")); - - @SetFromFlag("solrConfigTemplateUrl") - ConfigKey<String> SOLR_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey( - "solr.config.templateUrl", "Template file (in freemarker format) for the solr.xml config file", - "classpath://brooklyn/entity/nosql/solr/solr.xml"); - - @SetFromFlag("coreConfigMap") - ConfigKey<Map<String, String>> SOLR_CORE_CONFIG = ConfigKeys.newConfigKey(new TypeToken<Map<String, String>>() { }, - "solr.core.config", "Map of core names to core configuration archive URL", - Maps.<String, String>newHashMap()); - - ConfigKey<Duration> START_TIMEOUT = ConfigKeys.newConfigKeyWithDefault(BrooklynConfigKeys.START_TIMEOUT, Duration.FIVE_MINUTES); - - /* Accessors used from template */ - - Integer getSolrPort(); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServerDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServerDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServerDriver.java deleted file mode 100644 index dd44499..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServerDriver.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.solr; - -import brooklyn.entity.basic.SoftwareProcessDriver; -import brooklyn.entity.java.JavaSoftwareProcessDriver; - -public interface SolrServerDriver extends JavaSoftwareProcessDriver { - - Integer getSolrPort(); - - String getSolrConfigTemplateUrl(); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServerImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServerImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServerImpl.java deleted file mode 100644 index 3d32a93..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServerImpl.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.solr; - -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.SoftwareProcessImpl; -import brooklyn.event.feed.http.HttpFeed; -import brooklyn.event.feed.http.HttpPollConfig; -import brooklyn.event.feed.http.HttpValueFunctions; -import brooklyn.location.access.BrooklynAccessUtils; -import com.google.common.base.Functions; -import com.google.common.net.HostAndPort; - -import java.net.URI; -import java.util.concurrent.TimeUnit; - -/** - * Implementation of {@link SolrServer}. - */ -public class SolrServerImpl extends SoftwareProcessImpl implements SolrServer { - - @Override - public Integer getSolrPort() { - return getAttribute(SolrServer.SOLR_PORT); - } - - @Override - public Class<SolrServerDriver> getDriverInterface() { - return SolrServerDriver.class; - } - - private volatile HttpFeed httpFeed; - - @Override - protected void connectSensors() { - super.connectSensors(); - - HostAndPort hp = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getSolrPort()); - - String solrUri = String.format("http://%s:%d/solr", hp.getHostText(), hp.getPort()); - setAttribute(Attributes.MAIN_URI, URI.create(solrUri)); - - httpFeed = HttpFeed.builder() - .entity(this) - .period(500, TimeUnit.MILLISECONDS) - .baseUri(solrUri) - .poll(new HttpPollConfig<Boolean>(SERVICE_UP) - .onSuccess(HttpValueFunctions.responseCodeEquals(200)) - .onFailureOrException(Functions.constant(false))) - .build(); - } - - @Override - public void disconnectSensors() { - super.disconnectSensors(); - - if (httpFeed != null) httpFeed.stop(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServerSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServerSshDriver.java b/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServerSshDriver.java deleted file mode 100644 index 2174b36..0000000 --- a/software/nosql/src/main/java/brooklyn/entity/nosql/solr/SolrServerSshDriver.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.solr; - -import static java.lang.String.format; - -import java.util.List; -import java.util.Map; -import java.util.Set; - -import brooklyn.entity.java.JavaSoftwareProcessSshDriver; -import brooklyn.entity.java.UsesJmx; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver; -import brooklyn.entity.basic.Entities; -import brooklyn.location.Location; -import brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.file.ArchiveUtils; -import brooklyn.util.net.Networking; -import brooklyn.util.net.Urls; -import brooklyn.util.os.Os; -import brooklyn.util.ssh.BashCommands; -import brooklyn.util.stream.Streams; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; - -/** - * Start a {@link SolrServer} in a {@link Location} accessible over ssh. - */ -public class SolrServerSshDriver extends JavaSoftwareProcessSshDriver implements SolrServerDriver { - - private static final Logger log = LoggerFactory.getLogger(SolrServerSshDriver.class); - - public SolrServerSshDriver(SolrServerImpl entity, SshMachineLocation machine) { - super(entity, machine); - } - - @Override - public Integer getSolrPort() { return entity.getAttribute(SolrServer.SOLR_PORT); } - - @Override - public String getSolrConfigTemplateUrl() { return entity.getConfig(SolrServer.SOLR_CONFIG_TEMPLATE_URL); } - - public String getMirrorUrl() { return entity.getConfig(SolrServer.MIRROR_URL); } - - public String getPidFile() { return Os.mergePaths(getRunDir(), "solr.pid"); } - - @Override - public void preInstall() { - resolver = Entities.newDownloader(this); - setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("solr-%s", getVersion())))); - } - - @Override - public void install() { - List<String> urls = resolver.getTargets(); - String saveAs = resolver.getFilename(); - - List<String> commands = ImmutableList.<String>builder() - .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) - .add(BashCommands.INSTALL_TAR) - .add("tar xzfv " + saveAs) - .build(); - - newScript(INSTALLING) - .failOnNonZeroResultCode() - .body.append(commands) - .execute(); - } - - public Set<Integer> getPortsUsed() { - Set<Integer> result = Sets.newLinkedHashSet(super.getPortsUsed()); - result.addAll(getPortMap().values()); - return result; - } - - private Map<String, Integer> getPortMap() { - return ImmutableMap.<String, Integer>builder() - .put("solrPort", getSolrPort()) - .put("jmxPort", entity.getAttribute(UsesJmx.JMX_PORT)) - .put("rmiPort", entity.getAttribute(UsesJmx.RMI_REGISTRY_PORT)) - .build(); - } - - @Override - public void customize() { - log.debug("Customizing {}", entity); - Networking.checkPortsValid(getPortMap()); - - ImmutableList.Builder<String> commands = new ImmutableList.Builder<String>() - .add("mkdir contrib") - .add("mkdir solr") - .add(String.format("cp -R %s/example/{etc,contexts,lib,logs,resources,webapps} .", getExpandedInstallDir())) - .add(String.format("cp %s/example/start.jar .", getExpandedInstallDir())) - .add(String.format("cp %s/dist/*.jar lib/", getExpandedInstallDir())) - .add(String.format("cp %s/contrib/*/lib/*.jar contrib/", getExpandedInstallDir())); - - newScript(CUSTOMIZING) - .body.append(commands.build()) - .execute(); - - // Copy the solr.xml configuration file across - String configFileContents = processTemplate(getSolrConfigTemplateUrl()); - String destinationConfigFile = String.format("%s/solr/solr.xml", getRunDir()); - getMachine().copyTo(Streams.newInputStreamWithContents(configFileContents), destinationConfigFile); - - // Copy the core definitions across - Map<String, String> coreConfig = entity.getConfig(SolrServer.SOLR_CORE_CONFIG); - for (String core : coreConfig.keySet()) { - String url = coreConfig.get(core); - String solr = Urls.mergePaths(getRunDir(), "solr"); - ArchiveUtils.deploy(url, getMachine(), solr); - } - } - - @Override - public void launch() { - newScript(MutableMap.of(USE_PID_FILE, getPidFile()), LAUNCHING) - .body.append("nohup java $JAVA_OPTS -jar start.jar > ./logs/console.log 2>&1 &") - .execute(); - } - - @Override - public boolean isRunning() { - return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0; - } - - @Override - public void stop() { - newScript(MutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute(); - } - - @Override - protected String getLogFileLocation() { - return Urls.mergePaths(getRunDir(), "solr", "logs", "solr.log"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraCluster.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraCluster.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraCluster.java new file mode 100644 index 0000000..890ab60 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraCluster.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.cassandra; + +import brooklyn.entity.proxying.ImplementedBy; + +/** + * @deprecated since 0.7.0; use {@link CassandraDatacenter} which is equivalent but has + * a less ambiguous name; <em>Cluster</em> in Cassandra corresponds to what Brooklyn terms a <em>Fabric</em>. + */ +@Deprecated +@ImplementedBy(CassandraClusterImpl.class) +public interface CassandraCluster extends CassandraDatacenter { +}
