makes connection string available on MongoDbReplicaSet
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/16b8b1e3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/16b8b1e3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/16b8b1e3 Branch: refs/heads/master Commit: 16b8b1e393d525c88ba5b540ad880dab899888d6 Parents: b3ce256 Author: Robert Moss <[email protected]> Authored: Tue Sep 15 15:06:43 2015 +0100 Committer: Robert Moss <[email protected]> Committed: Wed Sep 16 15:35:28 2015 +0100 ---------------------------------------------------------------------- .../nosql/mongodb/AbstractMongoDBSshDriver.java | 8 ++-- .../entity/nosql/mongodb/MongoDBReplicaSet.java | 4 +- .../nosql/mongodb/MongoDBReplicaSetImpl.java | 48 ++++++++++++++----- .../entity/nosql/mongodb/MongoDBServerImpl.java | 50 ++++++++++---------- 4 files changed, 68 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16b8b1e3/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/AbstractMongoDBSshDriver.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/AbstractMongoDBSshDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/AbstractMongoDBSshDriver.java index c182355..14c495e 100644 --- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/AbstractMongoDBSshDriver.java +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/AbstractMongoDBSshDriver.java @@ -27,15 +27,15 @@ import org.apache.brooklyn.api.location.OsDetails; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.entity.software.base.AbstractSoftwareProcessSshDriver; import org.apache.brooklyn.entity.software.base.lifecycle.ScriptHelper; -import org.apache.brooklyn.util.core.internal.ssh.SshTool; -import org.apache.brooklyn.util.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.brooklyn.location.ssh.SshMachineLocation; +import org.apache.brooklyn.util.core.internal.ssh.SshTool; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.net.Networking; import org.apache.brooklyn.util.os.Os; import org.apache.brooklyn.util.ssh.BashCommands; +import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Joiner; import com.google.common.base.Strings; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16b8b1e3/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java index 12bbe87..6ebc17e 100644 --- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSet.java @@ -26,6 +26,7 @@ import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.entity.database.DatastoreMixins.HasDatastoreUrl; import org.apache.brooklyn.entity.group.Cluster; import org.apache.brooklyn.entity.group.DynamicCluster; import org.apache.brooklyn.util.core.flags.SetFromFlag; @@ -46,7 +47,7 @@ import com.google.common.reflect.TypeToken; * @see <a href="http://docs.mongodb.org/manual/replication/">http://docs.mongodb.org/manual/replication/</a> */ @ImplementedBy(MongoDBReplicaSetImpl.class) -public interface MongoDBReplicaSet extends DynamicCluster, MongoDBAuthenticationMixins { +public interface MongoDBReplicaSet extends DynamicCluster, MongoDBAuthenticationMixins, HasDatastoreUrl { @SetFromFlag("replicaSetName") ConfigKey<String> REPLICA_SET_NAME = ConfigKeys.newStringConfigKey( @@ -60,6 +61,7 @@ public interface MongoDBReplicaSet extends DynamicCluster, MongoDBAuthentication @SuppressWarnings("serial") AttributeSensor<List<String>> REPLICA_SET_ENDPOINTS = Sensors.newSensor(new TypeToken<List<String>>() {}, "mongodb.replicaSet.endpoints", "Endpoints active for this replica set"); + /** * The name of the replica set. http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16b8b1e3/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java index f96a56a..c4d675d 100644 --- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java @@ -54,6 +54,7 @@ import org.apache.brooklyn.util.text.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.api.client.util.Sets; import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.base.Predicates; @@ -108,7 +109,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB @Override public boolean apply(@Nullable Entity input) { return input != null && input instanceof MongoDBServer - && ReplicaSetMemberStatus.PRIMARY.equals(input.getAttribute(MongoDBServer.REPLICA_SET_MEMBER_STATUS)); + && ReplicaSetMemberStatus.PRIMARY.equals(input.sensors().get(MongoDBServer.REPLICA_SET_MEMBER_STATUS)); } }; @@ -118,7 +119,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB // getSecondaries relies on instanceof check return input != null && input instanceof MongoDBServer - && ReplicaSetMemberStatus.SECONDARY.equals(input.getAttribute(MongoDBServer.REPLICA_SET_MEMBER_STATUS)); + && ReplicaSetMemberStatus.SECONDARY.equals(input.sensors().get(MongoDBServer.REPLICA_SET_MEMBER_STATUS)); } }; @@ -165,7 +166,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB @Override public String getName() { // FIXME: Names must be unique if the replica sets are used in a sharded cluster - return getConfig(REPLICA_SET_NAME) + this.getId(); + return config().get(REPLICA_SET_NAME) + this.getId(); } @Override @@ -197,7 +198,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB */ private void serverAdded(MongoDBServer server) { try { - LOG.debug("Server added: {}. SERVICE_UP: {}", server, server.getAttribute(MongoDBServer.SERVICE_UP)); + LOG.debug("Server added: {}. SERVICE_UP: {}", server, server.sensors().get(MongoDBServer.SERVICE_UP)); // Set the primary if the replica set hasn't been initialised. if (mustInitialise.compareAndSet(true, false)) { @@ -205,8 +206,8 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB LOG.info("First server up in {} is: {}", getName(), server); boolean replicaSetInitialised = server.initializeReplicaSet(getName(), nextMemberId.getAndIncrement()); if (replicaSetInitialised) { - setAttribute(PRIMARY_ENTITY, server); - setAttribute(Startable.SERVICE_UP, true); + sensors().set(PRIMARY_ENTITY, server); + sensors().set(Startable.SERVICE_UP, true); } else { ServiceStateLogic.ServiceNotUpLogic.updateNotUpIndicator(this, "initialization", "replicaset failed to initialize"); ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE); @@ -234,7 +235,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB @Override public void run() { // SERVICE_UP is not guaranteed when additional members are added to the set. - Boolean isAvailable = secondary.getAttribute(MongoDBServer.SERVICE_UP); + Boolean isAvailable = secondary.sensors().get(MongoDBServer.SERVICE_UP); MongoDBServer primary = getPrimary(); boolean reschedule; if (Boolean.TRUE.equals(isAvailable) && primary != null) { @@ -278,14 +279,14 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB if (LOG.isDebugEnabled()) LOG.debug("Scheduling removal of member from {}: {}", getName(), member); // FIXME is there a chance of race here? - if (member.equals(getAttribute(PRIMARY_ENTITY))) - setAttribute(PRIMARY_ENTITY, null); + if (member.equals(sensors().get(PRIMARY_ENTITY))) + sensors().set(PRIMARY_ENTITY, null); executor.submit(new Runnable() { @Override public void run() { // Wait until the server has been stopped before reconfiguring the set. Quoth the MongoDB doc: // for best results always shut down the mongod instance before removing it from a replica set. - Boolean isAvailable = member.getAttribute(MongoDBServer.SERVICE_UP); + Boolean isAvailable = member.sensors().get(MongoDBServer.SERVICE_UP); // Wait for the replica set to elect a new primary if the set is reconfiguring itself. MongoDBServer primary = getPrimary(); boolean reschedule; @@ -377,11 +378,34 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB return MutableList.copyOf(endpoints); }}) .build()); + + addEnricher(Enrichers.builder() + .aggregating(MongoDBServer.MONGO_SERVER_ENDPOINT) + .publishing(DATASTORE_URL) + .fromMembers() + .valueToReportIfNoSensors(null) + .computing(new Function<Collection<String>, String>() { + @Override + public String apply(Collection<String> input) { + Set<String> endpoints = Sets.newHashSet(); + for (String endpoint: input) { + if (!Strings.isBlank(endpoint)) { + + endpoints.add(endpoint); + } + } + String credentials = MongoDBAuthenticationUtils.usesAuthentication(MongoDBReplicaSetImpl.this) ? + String.format("%s:%s@", + config().get(MongoDBAuthenticationMixins.ROOT_USERNAME), + config().get(MongoDBAuthenticationMixins.ROOT_PASSWORD)) : ""; + return String.format("mongodb://%s%s", credentials, Strings.join(endpoints, ",")); + }}) + .build()); subscribeToMembers(this, MongoDBServer.IS_PRIMARY_FOR_REPLICA_SET, new SensorEventListener<Boolean>() { @Override public void onEvent(SensorEvent<Boolean> event) { if (Boolean.TRUE == event.getValue()) - setAttribute(PRIMARY_ENTITY, (MongoDBServer)event.getSource()); + sensors().set(PRIMARY_ENTITY, (MongoDBServer)event.getSource()); } }); @@ -396,7 +420,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB // TODO Note that after this the executor will not run if the set is restarted. executor.shutdownNow(); super.stop(); - setAttribute(Startable.SERVICE_UP, false); + sensors().set(Startable.SERVICE_UP, false); } @Override http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/16b8b1e3/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java ---------------------------------------------------------------------- diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java index 941dd8e..040199b 100644 --- a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java +++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java @@ -62,13 +62,13 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer super.connectSensors(); connectServiceUpIsRunning(); - int port = getAttribute(MongoDBServer.PORT); + int port = sensors().get(MongoDBServer.PORT); HostAndPort accessibleAddress = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, port); - setAttribute(MONGO_SERVER_ENDPOINT, String.format("http://%s:%d", + sensors().set(MONGO_SERVER_ENDPOINT, String.format("%s:%d", accessibleAddress.getHostText(), accessibleAddress.getPort())); - int httpConsolePort = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getAttribute(HTTP_PORT)).getPort(); - setAttribute(HTTP_INTERFACE_URL, String.format("http://%s:%d", + int httpConsolePort = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, sensors().get(HTTP_PORT)).getPort(); + sensors().set(HTTP_INTERFACE_URL, String.format("http://%s:%d", accessibleAddress.getHostText(), httpConsolePort)); try { @@ -85,7 +85,7 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer .callable(new Callable<BasicBSONObject>() { @Override public BasicBSONObject call() throws Exception { - return MongoDBServerImpl.this.getAttribute(SERVICE_UP) + return MongoDBServerImpl.this.sensors().get(SERVICE_UP) ? client.getServerStatus() : null; } @@ -117,8 +117,8 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer .suppressDuplicates(true)) .build(); } else { - setAttribute(IS_PRIMARY_FOR_REPLICA_SET, false); - setAttribute(IS_SECONDARY_FOR_REPLICA_SET, false); + sensors().set(IS_PRIMARY_FOR_REPLICA_SET, false); + sensors().set(IS_SECONDARY_FOR_REPLICA_SET, false); } // Take interesting details from STATUS. @@ -126,29 +126,29 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer @Override public void onEvent(SensorEvent<BasicBSONObject> event) { BasicBSONObject map = event.getValue(); if (map != null && !map.isEmpty()) { - setAttribute(UPTIME_SECONDS, map.getDouble("uptime", 0)); + sensors().set(UPTIME_SECONDS, map.getDouble("uptime", 0)); // Operations BasicBSONObject opcounters = (BasicBSONObject) map.get("opcounters"); - setAttribute(OPCOUNTERS_INSERTS, opcounters.getLong("insert", 0)); - setAttribute(OPCOUNTERS_QUERIES, opcounters.getLong("query", 0)); - setAttribute(OPCOUNTERS_UPDATES, opcounters.getLong("update", 0)); - setAttribute(OPCOUNTERS_DELETES, opcounters.getLong("delete", 0)); - setAttribute(OPCOUNTERS_GETMORE, opcounters.getLong("getmore", 0)); - setAttribute(OPCOUNTERS_COMMAND, opcounters.getLong("command", 0)); + sensors().set(OPCOUNTERS_INSERTS, opcounters.getLong("insert", 0)); + sensors().set(OPCOUNTERS_QUERIES, opcounters.getLong("query", 0)); + sensors().set(OPCOUNTERS_UPDATES, opcounters.getLong("update", 0)); + sensors().set(OPCOUNTERS_DELETES, opcounters.getLong("delete", 0)); + sensors().set(OPCOUNTERS_GETMORE, opcounters.getLong("getmore", 0)); + sensors().set(OPCOUNTERS_COMMAND, opcounters.getLong("command", 0)); // Network stats BasicBSONObject network = (BasicBSONObject) map.get("network"); - setAttribute(NETWORK_BYTES_IN, network.getLong("bytesIn", 0)); - setAttribute(NETWORK_BYTES_OUT, network.getLong("bytesOut", 0)); - setAttribute(NETWORK_NUM_REQUESTS, network.getLong("numRequests", 0)); + sensors().set(NETWORK_BYTES_IN, network.getLong("bytesIn", 0)); + sensors().set(NETWORK_BYTES_OUT, network.getLong("bytesOut", 0)); + sensors().set(NETWORK_NUM_REQUESTS, network.getLong("numRequests", 0)); // Replica set stats BasicBSONObject repl = (BasicBSONObject) map.get("repl"); if (isReplicaSetMember() && repl != null) { - setAttribute(IS_PRIMARY_FOR_REPLICA_SET, repl.getBoolean("ismaster")); - setAttribute(IS_SECONDARY_FOR_REPLICA_SET, repl.getBoolean("secondary")); - setAttribute(REPLICA_SET_PRIMARY_ENDPOINT, repl.getString("primary")); + sensors().set(IS_PRIMARY_FOR_REPLICA_SET, repl.getBoolean("ismaster")); + sensors().set(IS_SECONDARY_FOR_REPLICA_SET, repl.getBoolean("secondary")); + sensors().set(REPLICA_SET_PRIMARY_ENDPOINT, repl.getString("primary")); } } } @@ -165,7 +165,7 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer @Override public MongoDBReplicaSet getReplicaSet() { - return getConfig(MongoDBServer.REPLICA_SET); + return config().get(MongoDBServer.REPLICA_SET); } @Override @@ -186,7 +186,7 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer // The ReplicaSet uses REPLICA_SET_MEMBER_STATUS to determine which node to call. // // Relying on caller to respect the `false` result, to retry. - if (!getAttribute(IS_PRIMARY_FOR_REPLICA_SET)) { + if (!sensors().get(IS_PRIMARY_FOR_REPLICA_SET)) { LOG.warn("Attempted to add {} to replica set at server that is not primary: {}", secondary, this); return false; } @@ -195,7 +195,7 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer @Override public boolean removeMemberFromReplicaSet(MongoDBServer server) { - if (!getAttribute(IS_PRIMARY_FOR_REPLICA_SET)) { + if (!sensors().get(IS_PRIMARY_FOR_REPLICA_SET)) { LOG.warn("Attempted to remove {} from replica set at server that is not primary: {}", server, this); return false; } @@ -206,8 +206,8 @@ public class MongoDBServerImpl extends SoftwareProcessImpl implements MongoDBSer public String toString() { return Objects.toStringHelper(this) .add("id", getId()) - .add("hostname", getAttribute(HOSTNAME)) - .add("port", getAttribute(PORT)) + .add("hostname", sensors().get(HOSTNAME)) + .add("port", sensors().get(PORT)) .toString(); } }
