http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerSshDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerSshDriver.java new file mode 100644 index 0000000..535bab6 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerSshDriver.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import org.apache.brooklyn.entity.nosql.mongodb.AbstractMongoDBSshDriver; +import org.apache.brooklyn.entity.nosql.mongodb.MongoDBDriver; + +import brooklyn.location.basic.SshMachineLocation; + +public class MongoDBConfigServerSshDriver extends AbstractMongoDBSshDriver implements MongoDBDriver { + + public MongoDBConfigServerSshDriver(MongoDBConfigServerImpl entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + public MongoDBConfigServerImpl getEntity() { + return MongoDBConfigServerImpl.class.cast(super.getEntity()); + } + + @Override + public void launch() { + launch(getArgsBuilderWithDefaults(getEntity()) + .add("--configsvr") + .add("--dbpath", getDataDirectory())); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouter.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouter.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouter.java new file mode 100644 index 0000000..195b10f --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import org.apache.brooklyn.catalog.Catalog; +import org.apache.brooklyn.entity.nosql.mongodb.AbstractMongoDBServer; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.Sensors; +import brooklyn.util.time.Duration; + +import com.google.common.reflect.TypeToken; + +@Catalog(name="MongoDB Router", + description="MongoDB (from \"humongous\") is a scalable, high-performance, open source NoSQL database", + iconUrl="classpath:///mongodb-logo.png") +@ImplementedBy(MongoDBRouterImpl.class) +public interface MongoDBRouter extends AbstractMongoDBServer { + + @SuppressWarnings("serial") + ConfigKey<Iterable<String>> CONFIG_SERVERS = ConfigKeys.newConfigKey( + new TypeToken<Iterable<String>>(){}, "mongodb.router.config.servers", "List of host names and ports of the config servers"); + + AttributeSensor<Integer> SHARD_COUNT = Sensors.newIntegerSensor("mongodb.router.config.shard.count", "Number of shards that have been added"); + + AttributeSensor<Boolean> RUNNING = Sensors.newBooleanSensor("mongodb.router.running", "Indicates that the router is running, " + + "and can be used to add shards, but is not necessarity available for CRUD operations (e.g. if no shards have been added)"); + + /** + * @throws IllegalStateException if times out. + */ + public void waitForServiceUp(Duration duration); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterCluster.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterCluster.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterCluster.java new file mode 100644 index 0000000..333a1bd --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterCluster.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import java.util.Collection; + +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.Sensors; + +@ImplementedBy(MongoDBRouterClusterImpl.class) +public interface MongoDBRouterCluster extends DynamicCluster { + + AttributeSensor<MongoDBRouter> ANY_ROUTER = Sensors.newSensor(MongoDBRouter.class, "mongodb.routercluster.any", + "When set, can be used to access one of the routers in the cluster (usually the first). This will only be set once " + + "at least one shard has been added, and the router is available for CRUD operations"); + + AttributeSensor<MongoDBRouter> ANY_RUNNING_ROUTER = Sensors.newSensor(MongoDBRouter.class, "mongodb.routercluster.any.running", + "When set, can be used to access one of the running routers in the cluster (usually the first). This should only be used " + + "to add shards as it does not guarantee that the router is available for CRUD operations"); + + /** + * @return One of the routers in the cluster if available, null otherwise + */ + MongoDBRouter getAnyRouter(); + + /** + * @return One of the running routers in the cluster. This should only be used to add shards as it does not guarantee that + * the router is available for CRUD operations + */ + MongoDBRouter getAnyRunningRouter(); + + /** + * @return All routers in the cluster + */ + Collection<MongoDBRouter> getRouters(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterClusterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterClusterImpl.java new file mode 100644 index 0000000..b905c10 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterClusterImpl.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import java.util.Collection; + +import brooklyn.entity.Entity; +import brooklyn.entity.basic.EntityPredicates; +import brooklyn.entity.group.AbstractMembershipTrackingPolicy; +import brooklyn.entity.group.DynamicClusterImpl; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.trait.Startable; +import brooklyn.event.SensorEvent; +import brooklyn.event.SensorEventListener; +import brooklyn.location.Location; +import brooklyn.policy.PolicySpec; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +public class MongoDBRouterClusterImpl extends DynamicClusterImpl implements MongoDBRouterCluster { + + @Override + public void init() { + super.init(); + subscribeToChildren(this, MongoDBRouter.RUNNING, new SensorEventListener<Boolean>() { + @Override public void onEvent(SensorEvent<Boolean> event) { + setAnyRouter(); + } + }); + } + + @Override + public void start(Collection<? extends Location> locations) { + super.start(locations); + addPolicy(PolicySpec.create(MemberTrackingPolicy.class) + .displayName("Router cluster membership tracker") + .configure("group", this)); + } + + public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { + @Override protected void onEntityEvent(EventType type, Entity member) { + ((MongoDBRouterClusterImpl)super.entity).setAnyRouter(); + } + @Override protected void onEntityRemoved(Entity member) { + ((MongoDBRouterClusterImpl)super.entity).setAnyRouter(); + } + @Override protected void onEntityChange(Entity member) { + ((MongoDBRouterClusterImpl)super.entity).setAnyRouter(); + } + } + + protected void setAnyRouter() { + setAttribute(MongoDBRouterCluster.ANY_ROUTER, Iterables.tryFind(getRouters(), + EntityPredicates.attributeEqualTo(Startable.SERVICE_UP, true)).orNull()); + + setAttribute( + MongoDBRouterCluster.ANY_RUNNING_ROUTER, + Iterables.tryFind(getRouters(), EntityPredicates.attributeEqualTo(MongoDBRouter.RUNNING, true)) + .orNull()); + } + + @Override + public Collection<MongoDBRouter> getRouters() { + return ImmutableList.copyOf(Iterables.filter(getMembers(), MongoDBRouter.class)); + } + + @Override + protected EntitySpec<?> getMemberSpec() { + if (super.getMemberSpec() != null) + return super.getMemberSpec(); + return EntitySpec.create(MongoDBRouter.class); + } + + @Override + public MongoDBRouter getAnyRouter() { + return getAttribute(MongoDBRouterCluster.ANY_ROUTER); + } + + @Override + public MongoDBRouter getAnyRunningRouter() { + return getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterDriver.java new file mode 100644 index 0000000..3c7a30c --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterDriver.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import brooklyn.entity.basic.SoftwareProcessDriver; + +public interface MongoDBRouterDriver extends SoftwareProcessDriver { + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterImpl.java new file mode 100644 index 0000000..cbbc6b8 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterImpl.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.entity.nosql.mongodb.MongoDBClientSupport; + +import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.event.feed.function.FunctionFeed; +import brooklyn.event.feed.function.FunctionPollConfig; + +import com.google.common.base.Functions; + +public class MongoDBRouterImpl extends SoftwareProcessImpl implements MongoDBRouter { + + private volatile FunctionFeed functionFeed; + + @Override + public Class<?> getDriverInterface() { + return MongoDBRouterDriver.class; + } + + @Override + protected void connectSensors() { + super.connectSensors(); + functionFeed = FunctionFeed.builder() + .entity(this) + .poll(new FunctionPollConfig<Boolean, Boolean>(RUNNING) + .period(5, TimeUnit.SECONDS) + .callable(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + MongoDBClientSupport clientSupport = MongoDBClientSupport.forServer(MongoDBRouterImpl.this); + return clientSupport.ping(); + } + }) + .onException(Functions.<Boolean>constant(false))) + .poll(new FunctionPollConfig<Boolean, Boolean>(SERVICE_UP) + .period(5, TimeUnit.SECONDS) + .callable(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + // TODO: This is the same as in AbstractMongoDBSshDriver.isRunning. + // This feels like the right place. But feels like can be more consistent with different + // MongoDB types using the FunctionFeed. + MongoDBClientSupport clientSupport = MongoDBClientSupport.forServer(MongoDBRouterImpl.this); + return clientSupport.ping() && MongoDBRouterImpl.this.getAttribute(SHARD_COUNT) > 0; + } + }) + .onException(Functions.<Boolean>constant(false))) + .poll(new FunctionPollConfig<Integer, Integer>(SHARD_COUNT) + .period(5, TimeUnit.SECONDS) + .callable(new Callable<Integer>() { + public Integer call() throws Exception { + MongoDBClientSupport clientSupport = MongoDBClientSupport.forServer(MongoDBRouterImpl.this); + return (int) clientSupport.getShardCount(); + } + }) + .onException(Functions.<Integer>constant(-1))) + .build(); + } + + @Override + protected void disconnectSensors() { + super.disconnectSensors(); + if (functionFeed != null) functionFeed.stop(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterSshDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterSshDriver.java new file mode 100644 index 0000000..422b9ac --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterSshDriver.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import org.apache.brooklyn.entity.nosql.mongodb.AbstractMongoDBSshDriver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.location.basic.SshMachineLocation; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; + +public class MongoDBRouterSshDriver extends AbstractMongoDBSshDriver implements MongoDBRouterDriver { + + private static final Logger LOG = LoggerFactory.getLogger(MongoDBRouterSshDriver.class); + + public MongoDBRouterSshDriver(MongoDBRouterImpl entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + public void launch() { + String configdb = Joiner.on(",").join(getEntity().getConfig(MongoDBRouter.CONFIG_SERVERS)); + ImmutableList.Builder<String> argsBuilder = getArgsBuilderWithDefaults(MongoDBRouterImpl.class.cast(getEntity())) + .add("--configdb", configdb); + + String args = Joiner.on(" ").join(argsBuilder.build()); + String command = String.format("%s/bin/mongos %s > out.log 2> err.log < /dev/null", getExpandedInstallDir(), args); + LOG.info(command); + newScript(LAUNCHING) + .updateTaskAndFailOnNonZeroResultCode() + .body.append(command).execute(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardCluster.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardCluster.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardCluster.java new file mode 100644 index 0000000..edf7d7a --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardCluster.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.ImplementedBy; + +@ImplementedBy(MongoDBShardClusterImpl.class) +public interface MongoDBShardCluster extends DynamicCluster { + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java new file mode 100644 index 0000000..7eb2571 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.brooklyn.entity.nosql.mongodb.MongoDBClientSupport; +import org.apache.brooklyn.entity.nosql.mongodb.MongoDBReplicaSet; +import org.apache.brooklyn.entity.nosql.mongodb.MongoDBServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.Entity; +import brooklyn.entity.group.DynamicClusterImpl; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.trait.Startable; +import brooklyn.event.SensorEvent; +import brooklyn.event.SensorEventListener; +import brooklyn.location.Location; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; + +public class MongoDBShardClusterImpl extends DynamicClusterImpl implements MongoDBShardCluster { + + private static final Logger LOG = LoggerFactory.getLogger(MongoDBShardClusterImpl.class); + + // TODO: Need to use attributes for this in order to support brooklyn restart + private Set<Entity> addedMembers = Sets.newConcurrentHashSet(); + + // TODO: Need to use attributes for this in order to support brooklyn restart + private Set<Entity> addingMembers = Sets.newConcurrentHashSet(); + + /** + * For shard addition and removal. + * Used for retrying. + * + * TODO Should use ExecutionManager. + */ + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + @Override + protected EntitySpec<?> getMemberSpec() { + EntitySpec<?> result = super.getMemberSpec(); + if (result == null) + result = EntitySpec.create(MongoDBReplicaSet.class); + result.configure(DynamicClusterImpl.INITIAL_SIZE, getConfig(MongoDBShardedDeployment.SHARD_REPLICASET_SIZE)); + return result; + } + + @Override + public void start(Collection<? extends Location> locations) { + subscribeToMembers(this, Startable.SERVICE_UP, new SensorEventListener<Boolean>() { + public void onEvent(SensorEvent<Boolean> event) { + addShards(); + } + }); + + super.start(locations); + + MongoDBRouterCluster routers = getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER); + subscribe(routers, MongoDBRouterCluster.ANY_RUNNING_ROUTER, new SensorEventListener<MongoDBRouter>() { + public void onEvent(SensorEvent<MongoDBRouter> event) { + if (event.getValue() != null) + addShards(); + } + }); + } + + @Override + public void stop() { + // TODO Note that after this the executor will not run if the set is restarted. + executor.shutdownNow(); + super.stop(); + } + + @Override + public void onManagementStopped() { + super.onManagementStopped(); + executor.shutdownNow(); + } + + protected void addShards() { + MongoDBRouter router = getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER).getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER); + if (router == null) { + if (LOG.isTraceEnabled()) LOG.trace("Not adding shards because no running router in {}", this); + return; + } + + for (Entity member : this.getMembers()) { + if (member.getAttribute(Startable.SERVICE_UP) && !addingMembers.contains(member)) { + LOG.info("{} adding shard {}", new Object[] {MongoDBShardClusterImpl.this, member}); + addingMembers.add(member); + addShardAsync(member); + } + } + } + + protected void addShardAsync(final Entity replicaSet) { + final Duration timeout = Duration.minutes(20); + final Stopwatch stopwatch = Stopwatch.createStarted(); + final AtomicInteger attempts = new AtomicInteger(); + + // TODO Don't use executor, use ExecutionManager; but following pattern in MongoDBReplicaSetImpl for now. + executor.submit(new Runnable() { + @Override + public void run() { + boolean reschedule; + MongoDBRouter router = getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER).getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER); + if (router == null) { + LOG.debug("Rescheduling adding shard {} because no running router for cluster {}", replicaSet, this); + reschedule = true; + } else { + MongoDBClientSupport client; + try { + client = MongoDBClientSupport.forServer(router); + } catch (UnknownHostException e) { + throw Exceptions.propagate(e); + } + + MongoDBServer primary = replicaSet.getAttribute(MongoDBReplicaSet.PRIMARY_ENTITY); + if (primary != null) { + String addr = String.format("%s:%d", primary.getAttribute(MongoDBServer.SUBNET_HOSTNAME), primary.getAttribute(MongoDBServer.PORT)); + String replicaSetURL = ((MongoDBReplicaSet) replicaSet).getName() + "/" + addr; + boolean added = client.addShardToRouter(replicaSetURL); + if (added) { + LOG.info("{} added shard {} via {}", new Object[] {MongoDBShardClusterImpl.this, replicaSetURL, router}); + addedMembers.add(replicaSet); + reschedule = false; + } else { + LOG.debug("Rescheduling addition of shard {} because add failed via router {}", replicaSetURL, router); + reschedule = true; + } + } else { + LOG.debug("Rescheduling addition of shard {} because primary is null", replicaSet); + reschedule = true; + } + } + + if (reschedule) { + int numAttempts = attempts.incrementAndGet(); + if (numAttempts > 1 && timeout.toMilliseconds() > stopwatch.elapsed(TimeUnit.MILLISECONDS)) { + executor.schedule(this, 3, TimeUnit.SECONDS); + } else { + LOG.warn("Timeout after {} attempts ({}) adding shard {}; aborting", + new Object[] {numAttempts, Time.makeTimeStringRounded(stopwatch), replicaSet}); + addingMembers.remove(replicaSet); + } + } + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeployment.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeployment.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeployment.java new file mode 100644 index 0000000..3383887 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeployment.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import org.apache.brooklyn.catalog.Catalog; +import org.apache.brooklyn.entity.nosql.mongodb.MongoDBReplicaSet; +import org.apache.brooklyn.entity.nosql.mongodb.MongoDBServer; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.Entity; +import brooklyn.entity.Group; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.entity.trait.Startable; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.Sensors; +import brooklyn.util.flags.SetFromFlag; +import brooklyn.util.time.Duration; + +import com.google.common.reflect.TypeToken; + +@Catalog(name="MongoDB Sharded Deployment", + description="MongoDB (from \"humongous\") is a scalable, high-performance, open source NoSQL database", + iconUrl="classpath:///mongodb-logo.png") +@ImplementedBy(MongoDBShardedDeploymentImpl.class) +public interface MongoDBShardedDeployment extends Entity, Startable { + @SetFromFlag("configClusterSize") + ConfigKey<Integer> CONFIG_CLUSTER_SIZE = ConfigKeys.newIntegerConfigKey("mongodb.config.cluster.size", + "Number of config servers", 3); + + @SetFromFlag("initialRouterClusterSize") + ConfigKey<Integer> INITIAL_ROUTER_CLUSTER_SIZE = ConfigKeys.newIntegerConfigKey("mongodb.router.cluster.initial.size", + "Initial number of routers (mongos)", 0); + + @SetFromFlag("initialShardClusterSize") + ConfigKey<Integer> INITIAL_SHARD_CLUSTER_SIZE = ConfigKeys.newIntegerConfigKey("mongodb.shard.cluster.initial.size", + "Initial number of shards (replicasets)", 2); + + @SetFromFlag("shardReplicaSetSize") + ConfigKey<Integer> SHARD_REPLICASET_SIZE = ConfigKeys.newIntegerConfigKey("mongodb.shard.replicaset.size", + "Number of servers (mongod) in each shard (replicaset)", 3); + + @SetFromFlag("routerUpTimeout") + ConfigKey<Duration> ROUTER_UP_TIMEOUT = ConfigKeys.newConfigKey(Duration.class, "mongodb.router.up.timeout", + "Maximum time to wait for the routers to become available before adding the shards", Duration.FIVE_MINUTES); + + @SetFromFlag("coLocatedRouterGroup") + ConfigKey<Group> CO_LOCATED_ROUTER_GROUP = ConfigKeys.newConfigKey(Group.class, "mongodb.colocated.router.group", + "Group to be monitored for the addition of new CoLocatedMongoDBRouter entities"); + + @SuppressWarnings("serial") + ConfigKey<EntitySpec<?>> MONGODB_ROUTER_SPEC = ConfigKeys.newConfigKey( + new TypeToken<EntitySpec<?>>() {}, + "mongodb.router.spec", + "Spec for Router instances", + EntitySpec.create(MongoDBRouter.class)); + + @SuppressWarnings("serial") + ConfigKey<EntitySpec<?>> MONGODB_REPLICA_SET_SPEC = ConfigKeys.newConfigKey( + new TypeToken<EntitySpec<?>>() {}, + "mongodb.replicaset.spec", + "Spec for Replica Set", + EntitySpec.create(MongoDBReplicaSet.class) + .configure(MongoDBReplicaSet.MEMBER_SPEC, EntitySpec.create(MongoDBServer.class))); + + @SuppressWarnings("serial") + ConfigKey<EntitySpec<?>> MONGODB_CONFIG_SERVER_SPEC = ConfigKeys.newConfigKey( + new TypeToken<EntitySpec<?>>() {}, + "mongodb.configserver.spec", + "Spec for Config Server instances", + EntitySpec.create(MongoDBConfigServer.class)); + + public static AttributeSensor<MongoDBConfigServerCluster> CONFIG_SERVER_CLUSTER = Sensors.newSensor( + MongoDBConfigServerCluster.class, "mongodbshardeddeployment.configservers", "Config servers"); + public static AttributeSensor<MongoDBRouterCluster> ROUTER_CLUSTER = Sensors.newSensor( + MongoDBRouterCluster.class, "mongodbshardeddeployment.routers", "Routers"); + + public static AttributeSensor<MongoDBShardCluster> SHARD_CLUSTER = Sensors.newSensor( + MongoDBShardCluster.class, "mongodbshardeddeployment.shards", "Shards"); + + public MongoDBConfigServerCluster getConfigCluster(); + + public MongoDBRouterCluster getRouterCluster(); + + public MongoDBShardCluster getShardCluster(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeploymentImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeploymentImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeploymentImpl.java new file mode 100644 index 0000000..74f0623 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeploymentImpl.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.mongodb.sharding; + +import static brooklyn.event.basic.DependentConfiguration.attributeWhenReady; + +import java.util.Collection; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.enricher.Enrichers; +import brooklyn.entity.Entity; +import brooklyn.entity.Group; +import brooklyn.entity.basic.AbstractEntity; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.Lifecycle; +import brooklyn.entity.basic.ServiceStateLogic; +import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic; +import brooklyn.entity.group.AbstractMembershipTrackingPolicy; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.trait.Startable; +import brooklyn.location.Location; +import brooklyn.policy.PolicySpec; +import brooklyn.util.exceptions.Exceptions; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class MongoDBShardedDeploymentImpl extends AbstractEntity implements MongoDBShardedDeployment { + + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(MongoDBShardedDeploymentImpl.class); + + @Override + public void init() { + super.init(); + + setAttribute(CONFIG_SERVER_CLUSTER, addChild(EntitySpec.create(MongoDBConfigServerCluster.class) + .configure(MongoDBConfigServerCluster.MEMBER_SPEC, getConfig(MONGODB_CONFIG_SERVER_SPEC)) + .configure(DynamicCluster.INITIAL_SIZE, getConfig(CONFIG_CLUSTER_SIZE)))); + setAttribute(ROUTER_CLUSTER, addChild(EntitySpec.create(MongoDBRouterCluster.class) + .configure(MongoDBRouterCluster.MEMBER_SPEC, getConfig(MONGODB_ROUTER_SPEC)) + .configure(DynamicCluster.INITIAL_SIZE, getConfig(INITIAL_ROUTER_CLUSTER_SIZE)) + .configure(MongoDBRouter.CONFIG_SERVERS, attributeWhenReady(getAttribute(CONFIG_SERVER_CLUSTER), MongoDBConfigServerCluster.CONFIG_SERVER_ADDRESSES)))); + setAttribute(SHARD_CLUSTER, addChild(EntitySpec.create(MongoDBShardCluster.class) + .configure(MongoDBShardCluster.MEMBER_SPEC, getConfig(MONGODB_REPLICA_SET_SPEC)) + .configure(DynamicCluster.INITIAL_SIZE, getConfig(INITIAL_SHARD_CLUSTER_SIZE)))); + addEnricher(Enrichers.builder() + .propagating(MongoDBConfigServerCluster.CONFIG_SERVER_ADDRESSES) + .from(getAttribute(CONFIG_SERVER_CLUSTER)) + .build()); + + ServiceNotUpLogic.updateNotUpIndicator(this, Attributes.SERVICE_STATE_ACTUAL, "stopped"); + } + + @Override + public void start(Collection<? extends Location> locations) { + ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING); + try { + final MongoDBRouterCluster routers = getAttribute(ROUTER_CLUSTER); + final MongoDBShardCluster shards = getAttribute(SHARD_CLUSTER); + List<DynamicCluster> clusters = ImmutableList.of(getAttribute(CONFIG_SERVER_CLUSTER), routers, shards); + Entities.invokeEffectorList(this, clusters, Startable.START, ImmutableMap.of("locations", locations)) + .get(); + + if (getConfigRaw(MongoDBShardedDeployment.CO_LOCATED_ROUTER_GROUP, true).isPresent()) { + addPolicy(PolicySpec.create(ColocatedRouterTrackingPolicy.class) + .displayName("Co-located router tracker") + .configure("group", (Group)getConfig(MongoDBShardedDeployment.CO_LOCATED_ROUTER_GROUP))); + } + ServiceNotUpLogic.clearNotUpIndicator(this, Attributes.SERVICE_STATE_ACTUAL); + ServiceStateLogic.setExpectedState(this, Lifecycle.RUNNING); + } catch (Exception e) { + ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE); + // no need to log here; the effector invocation should do that + throw Exceptions.propagate(e); + } + } + + public static class ColocatedRouterTrackingPolicy extends AbstractMembershipTrackingPolicy { + @Override + protected void onEntityAdded(Entity member) { + MongoDBRouterCluster cluster = entity.getAttribute(ROUTER_CLUSTER); + cluster.addMember(member.getAttribute(CoLocatedMongoDBRouter.ROUTER)); + } + @Override + protected void onEntityRemoved(Entity member) { + MongoDBRouterCluster cluster = entity.getAttribute(ROUTER_CLUSTER); + cluster.removeMember(member.getAttribute(CoLocatedMongoDBRouter.ROUTER)); + } + }; + + @Override + public void stop() { + ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPING); + try { + Entities.invokeEffectorList(this, ImmutableList.of(getAttribute(CONFIG_SERVER_CLUSTER), getAttribute(ROUTER_CLUSTER), + getAttribute(SHARD_CLUSTER)), Startable.STOP).get(); + } catch (Exception e) { + ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE); + throw Exceptions.propagate(e); + } + ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPED); + ServiceNotUpLogic.updateNotUpIndicator(this, Attributes.SERVICE_STATE_ACTUAL, "stopped"); + } + + @Override + public void restart() { + throw new UnsupportedOperationException(); + } + + @Override + public MongoDBConfigServerCluster getConfigCluster() { + return getAttribute(CONFIG_SERVER_CLUSTER); + } + + @Override + public MongoDBRouterCluster getRouterCluster() { + return getAttribute(ROUTER_CLUSTER); + } + + @Override + public MongoDBShardCluster getShardCluster() { + return getAttribute(SHARD_CLUSTER); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisCluster.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisCluster.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisCluster.java new file mode 100644 index 0000000..26f4f1c --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisCluster.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.redis; + +import org.apache.brooklyn.catalog.Catalog; +import brooklyn.entity.Entity; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.entity.trait.Startable; + +/** + * A cluster of {@link RedisStore}s with one master and a group of slaves. + * + * The slaves are contained in a {@link DynamicCluster} which can be resized by a policy if required. + * + * TODO add sensors with aggregated Redis statistics from cluster + */ +@Catalog(name="Redis Cluster", description="Redis is an open-source, networked, in-memory, key-value data store with optional durability", iconUrl="classpath:///redis-logo.png") +@ImplementedBy(RedisClusterImpl.class) +public interface RedisCluster extends Entity, Startable { + + public RedisStore getMaster(); + + public DynamicCluster getSlaves(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisClusterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisClusterImpl.java new file mode 100644 index 0000000..39c9dbe --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisClusterImpl.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.redis; + +import java.util.Collection; + +import brooklyn.enricher.Enrichers; +import brooklyn.entity.basic.AbstractEntity; +import brooklyn.entity.basic.Lifecycle; +import brooklyn.entity.basic.ServiceStateLogic; +import brooklyn.entity.basic.ServiceStateLogic.ComputeServiceIndicatorsFromChildrenAndMembers; +import brooklyn.entity.basic.ServiceStateLogic.ServiceProblemsLogic; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.Sensors; +import brooklyn.location.Location; +import brooklyn.util.collections.QuorumCheck.QuorumChecks; +import brooklyn.util.exceptions.Exceptions; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +public class RedisClusterImpl extends AbstractEntity implements RedisCluster { + + private static AttributeSensor<RedisStore> MASTER = Sensors.newSensor(RedisStore.class, "redis.master"); + private static AttributeSensor<DynamicCluster> SLAVES = Sensors.newSensor(DynamicCluster.class, "redis.slaves"); + + public RedisClusterImpl() { + } + + @Override + public RedisStore getMaster() { + return getAttribute(MASTER); + } + + @Override + public DynamicCluster getSlaves() { + return getAttribute(SLAVES); + } + + @Override + public void init() { + super.init(); + + RedisStore master = addChild(EntitySpec.create(RedisStore.class)); + setAttribute(MASTER, master); + + DynamicCluster slaves = addChild(EntitySpec.create(DynamicCluster.class) + .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(RedisSlave.class).configure(RedisSlave.MASTER, master))); + setAttribute(SLAVES, slaves); + + addEnricher(Enrichers.builder() + .propagating(RedisStore.HOSTNAME, RedisStore.ADDRESS, RedisStore.SUBNET_HOSTNAME, RedisStore.SUBNET_ADDRESS, RedisStore.REDIS_PORT) + .from(master) + .build()); + } + + @Override + protected void initEnrichers() { + super.initEnrichers(); + ServiceStateLogic.newEnricherFromChildrenUp(). + checkChildrenOnly(). + requireUpChildren(QuorumChecks.all()). + configure(ComputeServiceIndicatorsFromChildrenAndMembers.IGNORE_ENTITIES_WITH_THESE_SERVICE_STATES, ImmutableSet.<Lifecycle>of()). + addTo(this); + } + + @Override + public void start(Collection<? extends Location> locations) { + ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING); + ServiceProblemsLogic.clearProblemsIndicator(this, START); + try { + doStart(locations); + ServiceStateLogic.setExpectedState(this, Lifecycle.RUNNING); + } catch (Exception e) { + ServiceProblemsLogic.updateProblemsIndicator(this, START, "Start failed with error: "+e); + ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE); + throw Exceptions.propagate(e); + } + } + + private void doStart(Collection<? extends Location> locations) { + RedisStore master = getMaster(); + master.invoke(RedisStore.START, ImmutableMap.<String, Object>of("locations", ImmutableList.copyOf(locations))).getUnchecked(); + + DynamicCluster slaves = getSlaves(); + slaves.invoke(DynamicCluster.START, ImmutableMap.<String, Object>of("locations", ImmutableList.copyOf(locations))).getUnchecked(); + } + + @Override + public void stop() { + ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPING); + try { + doStop(); + ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPED); + } catch (Exception e) { + ServiceProblemsLogic.updateProblemsIndicator(this, STOP, "Stop failed with error: "+e); + ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE); + throw Exceptions.propagate(e); + } + } + + private void doStop() { + getSlaves().invoke(DynamicCluster.STOP, ImmutableMap.<String, Object>of()).getUnchecked(); + getMaster().invoke(RedisStore.STOP, ImmutableMap.<String, Object>of()).getUnchecked(); + } + + @Override + public void restart() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShard.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShard.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShard.java new file mode 100644 index 0000000..09d71b3 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShard.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.redis; + +import brooklyn.entity.Entity; +import brooklyn.entity.proxying.ImplementedBy; + +@ImplementedBy(RedisShardImpl.class) +public interface RedisShard extends Entity { +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShardImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShardImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShardImpl.java new file mode 100644 index 0000000..87396f5 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShardImpl.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.redis; + +import brooklyn.entity.basic.AbstractEntity; + +public class RedisShardImpl extends AbstractEntity implements RedisShard { + public RedisShardImpl() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlave.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlave.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlave.java new file mode 100644 index 0000000..af91beb --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlave.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.redis; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.util.flags.SetFromFlag; + +/** + * A {@link RedisStore} configured as a slave. + */ +@ImplementedBy(RedisSlaveImpl.class) +public interface RedisSlave extends RedisStore { + + @SetFromFlag("master") + ConfigKey<RedisStore> MASTER = new BasicConfigKey<RedisStore>(RedisStore.class, "redis.master", "Redis master"); + + @SetFromFlag("redisConfigTemplateUrl") + ConfigKey<String> REDIS_CONFIG_TEMPLATE_URL = new BasicConfigKey<String>( + String.class, "redis.config.templateUrl", "Template file (in freemarker format) for the redis.conf config file", + "classpath://org/apache/brooklyn/entity/nosql/redis/slave.conf"); + + RedisStore getMaster(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlaveImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlaveImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlaveImpl.java new file mode 100644 index 0000000..b58ce7d --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlaveImpl.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.redis; + + +/** + * A {@link RedisStore} configured as a slave. + */ +public class RedisSlaveImpl extends RedisStoreImpl implements RedisSlave { + + public RedisSlaveImpl() { + } + + @Override + public RedisStore getMaster() { + return getConfig(MASTER); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStore.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStore.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStore.java new file mode 100644 index 0000000..8d2cef1 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStore.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.redis; + +import org.apache.brooklyn.catalog.Catalog; +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; +import brooklyn.util.flags.SetFromFlag; + +/** + * An entity that represents a Redis key-value store service. + */ +@Catalog(name="Redis Server", description="Redis is an open-source, networked, in-memory, key-value data store with optional durability", iconUrl="classpath:///redis-logo.png") +@ImplementedBy(RedisStoreImpl.class) +public interface RedisStore extends SoftwareProcess { + + @SetFromFlag("version") + ConfigKey<String> SUGGESTED_VERSION = + ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "2.8.4"); + + @SetFromFlag("downloadUrl") + BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( + SoftwareProcess.DOWNLOAD_URL, "http://download.redis.io/releases/redis-${version}.tar.gz"); + + @SetFromFlag("redisPort") + PortAttributeSensorAndConfigKey REDIS_PORT = new PortAttributeSensorAndConfigKey("redis.port", "Redis port number", "6379+"); + + @SetFromFlag("redisConfigTemplateUrl") + ConfigKey<String> REDIS_CONFIG_TEMPLATE_URL = ConfigKeys.newConfigKey( + "redis.config.templateUrl", "Template file (in freemarker format) for the redis.conf config file", + "classpath://org/apache/brooklyn/entity/nosql/redis/redis.conf"); + + AttributeSensor<Integer> UPTIME = Sensors.newIntegerSensor("redis.uptime", "Redis uptime in seconds"); + + // See http://redis.io/commands/info for details of all information available + AttributeSensor<Integer> TOTAL_CONNECTIONS_RECEIVED = Sensors.newIntegerSensor("redis.connections.received.total", "Total number of connections accepted by the server"); + AttributeSensor<Integer> TOTAL_COMMANDS_PROCESSED = Sensors.newIntegerSensor("redis.commands.processed.total", "Total number of commands processed by the server"); + AttributeSensor<Integer> EXPIRED_KEYS = Sensors.newIntegerSensor("redis.keys.expired", "Total number of key expiration events"); + AttributeSensor<Integer> EVICTED_KEYS = Sensors.newIntegerSensor("redis.keys.evicted", "Number of evicted keys due to maxmemory limit"); + AttributeSensor<Integer> KEYSPACE_HITS = Sensors.newIntegerSensor("redis.keyspace.hits", "Number of successful lookup of keys in the main dictionary"); + AttributeSensor<Integer> KEYSPACE_MISSES = Sensors.newIntegerSensor("redis.keyspace.misses", "Number of failed lookup of keys in the main dictionary"); + + String getAddress(); + + Integer getRedisPort(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreDriver.java new file mode 100644 index 0000000..ba77cfd --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreDriver.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.redis; + +import brooklyn.entity.basic.SoftwareProcessDriver; + +public interface RedisStoreDriver extends SoftwareProcessDriver { + + String getRunDir(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreImpl.java new file mode 100644 index 0000000..f556bcf --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreImpl.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.redis; + +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.event.feed.ssh.SshFeed; +import brooklyn.event.feed.ssh.SshPollConfig; +import brooklyn.event.feed.ssh.SshPollValue; +import brooklyn.event.feed.ssh.SshValueFunctions; +import brooklyn.location.Location; +import brooklyn.location.MachineLocation; +import brooklyn.location.basic.SshMachineLocation; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Optional; +import com.google.common.base.Predicates; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; + +/** + * An entity that represents a Redis key-value store service. + */ +public class RedisStoreImpl extends SoftwareProcessImpl implements RedisStore { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(RedisStore.class); + + private transient SshFeed sshFeed; + + public RedisStoreImpl() { + } + + @Override + protected void connectSensors() { + super.connectSensors(); + + connectServiceUpIsRunning(); + + // Find an SshMachineLocation for the UPTIME feed + Optional<Location> location = Iterables.tryFind(getLocations(), Predicates.instanceOf(SshMachineLocation.class)); + if (!location.isPresent()) throw new IllegalStateException("Could not find SshMachineLocation in list of locations"); + SshMachineLocation machine = (SshMachineLocation) location.get(); + String statsCommand = getDriver().getRunDir() + "/bin/redis-cli -p " + getRedisPort() + " info stats"; + + sshFeed = SshFeed.builder() + .entity(this) + .machine(machine) + .period(5, TimeUnit.SECONDS) + .poll(new SshPollConfig<Integer>(UPTIME) + .command(getDriver().getRunDir() + "/bin/redis-cli -p " + getRedisPort() + " info server") + .onFailureOrException(Functions.constant(-1)) + .onSuccess(infoFunction("uptime_in_seconds"))) + .poll(new SshPollConfig<Integer>(TOTAL_CONNECTIONS_RECEIVED) + .command(statsCommand) + .onFailureOrException(Functions.constant(-1)) + .onSuccess(infoFunction("total_connections_received"))) + .poll(new SshPollConfig<Integer>(TOTAL_COMMANDS_PROCESSED) + .command(statsCommand) + .onFailureOrException(Functions.constant(-1)) + .onSuccess(infoFunction("total_commands_processed"))) + .poll(new SshPollConfig<Integer>(EXPIRED_KEYS) + .command(statsCommand) + .onFailureOrException(Functions.constant(-1)) + .onSuccess(infoFunction("expired_keys"))) + .poll(new SshPollConfig<Integer>(EVICTED_KEYS) + .command(statsCommand) + .onFailureOrException(Functions.constant(-1)) + .onSuccess(infoFunction("evicted_keys"))) + .poll(new SshPollConfig<Integer>(KEYSPACE_HITS) + .command(statsCommand) + .onFailureOrException(Functions.constant(-1)) + .onSuccess(infoFunction("keyspace_hits"))) + .poll(new SshPollConfig<Integer>(KEYSPACE_MISSES) + .command(statsCommand) + .onFailureOrException(Functions.constant(-1)) + .onSuccess(infoFunction("keyspace_misses"))) + .build(); + } + + /** + * Create a {@link Function} to retrieve a particular field value from a {@code redis-cli info} + * command. + * + * @param field the info field to retrieve and convert + * @return a new function that converts a {@link SshPollValue} to an {@link Integer} + */ + private static Function<SshPollValue, Integer> infoFunction(final String field) { + return Functions.compose(new Function<String, Integer>() { + @Override + public Integer apply(@Nullable String input) { + Optional<String> line = Iterables.tryFind(Splitter.on('\n').split(input), Predicates.containsPattern(field + ":")); + if (line.isPresent()) { + String data = line.get().trim(); + int colon = data.indexOf(":"); + return Integer.parseInt(data.substring(colon + 1)); + } else { + throw new IllegalStateException("Data for field "+field+" not found: "+input); + } + } + }, SshValueFunctions.stdout()); + } + + @Override + public void disconnectSensors() { + disconnectServiceUpIsRunning(); + if (sshFeed != null) sshFeed.stop(); + super.disconnectSensors(); + } + + @Override + public Class<?> getDriverInterface() { + return RedisStoreDriver.class; + } + + @Override + public RedisStoreDriver getDriver() { + return (RedisStoreDriver) super.getDriver(); + } + + @Override + public String getAddress() { + MachineLocation machine = getMachineOrNull(); + return (machine != null) ? machine.getAddress().getHostAddress() : null; + } + + @Override + public Integer getRedisPort() { + return getAttribute(RedisStore.REDIS_PORT); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreSshDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreSshDriver.java new file mode 100644 index 0000000..c362e4e --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreSshDriver.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.redis; + +import static java.lang.String.format; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver; +import brooklyn.entity.basic.Entities; +import brooklyn.location.Location; +import brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.os.Os; +import brooklyn.util.ssh.BashCommands; + +import com.google.common.collect.ImmutableList; + +/** + * Start a {@link RedisStore} in a {@link Location} accessible over ssh. + */ +public class RedisStoreSshDriver extends AbstractSoftwareProcessSshDriver implements RedisStoreDriver { + + private static final Logger LOG = LoggerFactory.getLogger(RedisStoreSshDriver.class); + + public RedisStoreSshDriver(RedisStoreImpl entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + public void preInstall() { + resolver = Entities.newDownloader(this); + setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("redis-%s", getVersion())))); + } + + @Override + public void install() { + List<String> urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + + MutableMap<String, String> installGccPackageFlags = MutableMap.of( + "onlyifmissing", "gcc", + "yum", "gcc", + "apt", "gcc", + "port", null); + MutableMap<String, String> installMakePackageFlags = MutableMap.of( + "onlyifmissing", "make", + "yum", "make", + "apt", "make", + "port", null); + + List<String> commands = ImmutableList.<String>builder() + .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) + .add(BashCommands.INSTALL_TAR) + .add(BashCommands.INSTALL_CURL) + .add(BashCommands.installPackage(installGccPackageFlags, "redis-prerequisites-gcc")) + .add(BashCommands.installPackage(installMakePackageFlags, "redis-prerequisites-make")) + .add("tar xzfv " + saveAs) + .add(format("cd redis-%s", getVersion())) + .add("pushd deps") + .add("make lua hiredis linenoise") + .add("popd") + .add("make clean && make") + .build(); + + newScript(INSTALLING) + .failOnNonZeroResultCode() + .body.append(commands).execute(); + } + + @Override + public void customize() { + newScript(MutableMap.of("usePidFile", false), CUSTOMIZING) + .failOnNonZeroResultCode() + .body.append( + format("cd %s", getExpandedInstallDir()), + "make install PREFIX="+getRunDir()) + .execute(); + + copyTemplate(getEntity().getConfig(RedisStore.REDIS_CONFIG_TEMPLATE_URL), "redis.conf"); + } + + @Override + public void launch() { + // TODO Should we redirect stdout/stderr: format(" >> %s/console 2>&1 </dev/null &", getRunDir()) + newScript(MutableMap.of("usePidFile", false), LAUNCHING) + .failOnNonZeroResultCode() + .body.append("./bin/redis-server redis.conf") + .execute(); + } + + @Override + public boolean isRunning() { + return newScript(MutableMap.of("usePidFile", false), CHECK_RUNNING) + .body.append("./bin/redis-cli -p " + getEntity().getAttribute(RedisStore.REDIS_PORT) + " ping > /dev/null") + .execute() == 0; + } + + /** + * Restarts redis with the current configuration. + */ + @Override + public void stop() { + int exitCode = newScript(MutableMap.of("usePidFile", false), STOPPING) + .body.append("./bin/redis-cli -p " + getEntity().getAttribute(RedisStore.REDIS_PORT) + " shutdown") + .execute(); + // TODO: Good enough? Will cause warnings when trying to stop a server that is already not running. + if (exitCode != 0) { + LOG.warn("Unexpected exit code when stopping {}: {}", entity, exitCode); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakCluster.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakCluster.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakCluster.java new file mode 100644 index 0000000..99df1a2 --- /dev/null +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakCluster.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.riak; + +import java.net.URI; +import java.util.Map; + +import org.apache.brooklyn.catalog.Catalog; +import brooklyn.config.ConfigKey; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.Sensors; +import brooklyn.util.flags.SetFromFlag; +import brooklyn.util.time.Duration; + +import com.google.common.reflect.TypeToken; + +@Catalog(name="Riak Cluster", description="Riak is a distributed NoSQL key-value data store that offers " + + "extremely high availability, fault tolerance, operational simplicity and scalability.") +@ImplementedBy(RiakClusterImpl.class) +public interface RiakCluster extends DynamicCluster { + + @SuppressWarnings("serial") + AttributeSensor<Map<Entity, String>> RIAK_CLUSTER_NODES = Sensors.newSensor( + new TypeToken<Map<Entity, String>>() {}, + "riak.cluster.nodes", "Names of all active Riak nodes in the cluster <Entity,Riak Name>"); + + @SetFromFlag("delayBeforeAdvertisingCluster") + ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class, "riak.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before checking and advertising its availability", Duration.seconds(2 * 60)); + + AttributeSensor<Boolean> IS_CLUSTER_INIT = Sensors.newBooleanSensor("riak.cluster.isClusterInit", "Flag to determine if the cluster was already initialized"); + + AttributeSensor<Boolean> IS_FIRST_NODE_SET = Sensors.newBooleanSensor("riak.cluster.isFirstNodeSet", "Flag to determine if the first node has been set"); + + AttributeSensor<String> NODE_LIST = Sensors.newStringSensor("riak.cluster.nodeList", "List of nodes (including ports), comma separated"); + + AttributeSensor<String> NODE_LIST_PB_PORT = Sensors.newStringSensor("riak.cluster.nodeListPbPort", "List of nodes (including ports for riak db clients), comma separated"); + + AttributeSensor<URI> RIAK_CONSOLE_URI = Attributes.MAIN_URI; + + AttributeSensor<Integer> NODE_GETS_1MIN_PER_NODE = Sensors.newIntegerSensor("riak.node.gets.1m.perNode", "Gets in the last minute, averaged across cluster"); + AttributeSensor<Integer> NODE_PUTS_1MIN_PER_NODE = Sensors.newIntegerSensor("riak.node.puts.1m.perNode", "Puts in the last minute, averaged across cluster"); + AttributeSensor<Integer> NODE_OPS_1MIN_PER_NODE = Sensors.newIntegerSensor("riak.node.ops.1m.perNode", "Sum of node gets and puts in the last minute, averaged across cluster"); + +}
