MySqlCluster - initial implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/bae9628b Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/bae9628b Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/bae9628b Branch: refs/heads/master Commit: bae9628bf91e825246915f0ca2aa42ce526c9b8b Parents: 0c6248f Author: Svetoslav Neykov <[email protected]> Authored: Wed Jul 29 19:40:02 2015 +0300 Committer: Svetoslav Neykov <[email protected]> Committed: Wed Aug 5 15:23:06 2015 +0300 ---------------------------------------------------------------------- .../group/AbstractMembershipTrackingPolicy.java | 3 +- .../entity/group/DynamicClusterImpl.java | 16 +- software/database/pom.xml | 2 + .../entity/database/mysql/MySqlCluster.java | 63 +++ .../entity/database/mysql/MySqlClusterImpl.java | 397 +++++++++++++++++++ .../entity/database/mysql/MySqlNode.java | 3 + .../entity/database/mysql/MySqlNodeImpl.java | 5 +- .../entity/database/mysql/mysql_master.conf | 26 ++ .../entity/database/mysql/mysql_slave.conf | 33 ++ .../util/collections/CollectionFunctionals.java | 8 + 10 files changed, 547 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/core/src/main/java/brooklyn/entity/group/AbstractMembershipTrackingPolicy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/entity/group/AbstractMembershipTrackingPolicy.java b/core/src/main/java/brooklyn/entity/group/AbstractMembershipTrackingPolicy.java index 459f515..6f71a8c 100644 --- a/core/src/main/java/brooklyn/entity/group/AbstractMembershipTrackingPolicy.java +++ b/core/src/main/java/brooklyn/entity/group/AbstractMembershipTrackingPolicy.java @@ -241,7 +241,8 @@ public abstract class AbstractMembershipTrackingPolicy extends AbstractPolicy { /** * Called when a member is removed. - * Note that entity change events may arrive after this event; they should typically be ignored. + * Note that entity change events may arrive after this event; they should typically be ignored. + * The entity could already be unmanaged at this point so limited functionality is available (i.e. can't access config keys). */ protected void onEntityRemoved(Entity member) {} } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java b/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java index 0e2f164..a384281 100644 --- a/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java +++ b/core/src/main/java/brooklyn/entity/group/DynamicClusterImpl.java @@ -159,9 +159,9 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus @Override protected void initEnrichers() { - if (getConfigRaw(UP_QUORUM_CHECK, true).isAbsent() && getConfig(INITIAL_SIZE)==0) { + if (config().getRaw(UP_QUORUM_CHECK).isAbsent() && getConfig(INITIAL_SIZE)==0) { // if initial size is 0 then override up check to allow zero if empty - setConfig(UP_QUORUM_CHECK, QuorumChecks.atLeastOneUnlessEmpty()); + config().set(UP_QUORUM_CHECK, QuorumChecks.atLeastOneUnlessEmpty()); setAttribute(SERVICE_UP, true); } else { setAttribute(SERVICE_UP, false); @@ -173,7 +173,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus @Override public void setRemovalStrategy(Function<Collection<Entity>, Entity> val) { - setConfig(REMOVAL_STRATEGY, checkNotNull(val, "removalStrategy")); + config().set(REMOVAL_STRATEGY, checkNotNull(val, "removalStrategy")); } protected Function<Collection<Entity>, Entity> getRemovalStrategy() { @@ -183,7 +183,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus @Override public void setZonePlacementStrategy(NodePlacementStrategy val) { - setConfig(ZONE_PLACEMENT_STRATEGY, checkNotNull(val, "zonePlacementStrategy")); + config().set(ZONE_PLACEMENT_STRATEGY, checkNotNull(val, "zonePlacementStrategy")); } protected NodePlacementStrategy getZonePlacementStrategy() { @@ -192,13 +192,17 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus @Override public void setZoneFailureDetector(ZoneFailureDetector val) { - setConfig(ZONE_FAILURE_DETECTOR, checkNotNull(val, "zoneFailureDetector")); + config().set(ZONE_FAILURE_DETECTOR, checkNotNull(val, "zoneFailureDetector")); } protected ZoneFailureDetector getZoneFailureDetector() { return checkNotNull(getConfig(ZONE_FAILURE_DETECTOR), "zoneFailureDetector config"); } + protected EntitySpec<?> getFirstMemberSpec() { + return getConfig(FIRST_MEMBER_SPEC); + } + protected EntitySpec<?> getMemberSpec() { return getConfig(MEMBER_SPEC); } @@ -780,7 +784,7 @@ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicClus protected Entity createNode(@Nullable Location loc, Map<?,?> flags) { EntitySpec<?> memberSpec = null; - if (getMembers().isEmpty()) memberSpec = getConfig(FIRST_MEMBER_SPEC); + if (getMembers().isEmpty()) memberSpec = getFirstMemberSpec(); if (memberSpec == null) memberSpec = getMemberSpec(); if (memberSpec != null) { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/software/database/pom.xml ---------------------------------------------------------------------- diff --git a/software/database/pom.xml b/software/database/pom.xml index b01f636..fed3b0d 100644 --- a/software/database/pom.xml +++ b/software/database/pom.xml @@ -49,6 +49,8 @@ <exclude>src/main/resources/brooklyn/entity/database/crate/crate.yaml</exclude> <exclude>src/main/resources/brooklyn/entity/database/mariadb/my.cnf</exclude> <exclude>src/main/resources/brooklyn/entity/database/mysql/mysql.conf</exclude> + <exclude>src/main/resources/brooklyn/entity/database/mysql/mysql_master.conf</exclude> + <exclude>src/main/resources/brooklyn/entity/database/mysql/mysql_slave.conf</exclude> <exclude>src/main/resources/brooklyn/entity/database/postgresql/postgresql.conf</exclude> <exclude>src/main/resources/brooklyn/entity/database/rubyrep/rubyrep.conf</exclude> <exclude>src/main/resources/brooklyn/entity/database/mssql/ConfigurationFile.ini</exclude> http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java new file mode 100644 index 0000000..8b19ef7 --- /dev/null +++ b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlCluster.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.entity.database.mysql; + +import java.util.Collection; + +import com.google.common.reflect.TypeToken; + +import brooklyn.catalog.Catalog; +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.database.DatastoreMixins.HasDatastoreUrl; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; + +@ImplementedBy(MySqlClusterImpl.class) +@Catalog(name="MySql Master-Slave cluster", description="Sets up a cluster of MySQL nodes using master-slave relation and binary logging", iconUrl="classpath:///mysql-logo-110x57.png") +public interface MySqlCluster extends DynamicCluster, HasDatastoreUrl { + interface MySqlMaster { + AttributeSensor<String> MASTER_LOG_FILE = Sensors.newStringSensor("mysql.master.log_file", "The binary log file master is writing to"); + AttributeSensor<Integer> MASTER_LOG_POSITION = Sensors.newIntegerSensor("mysql.master.log_position", "The position in the log file to start replication"); + } + + ConfigKey<String> SLAVE_USERNAME = ConfigKeys.newStringConfigKey( + "mysql.slave.username", "The user name slaves will use to connect to the master", "slave"); + ConfigKey<String> SLAVE_REPLICATE_DO_DB = ConfigKeys.newStringConfigKey( + "mysql.slave.replicate_do_db", "Replicate only listed DBs"); + ConfigKey<String> SLAVE_REPLICATE_IGNORE_DB = ConfigKeys.newStringConfigKey( + "mysql.slave.replicate_ignore_db", "Don't replicate listed DBs"); + ConfigKey<String> SLAVE_REPLICATE_DO_TABLE = ConfigKeys.newStringConfigKey( + "mysql.slave.replicate_do_table", "Replicate only listed tables"); + ConfigKey<String> SLAVE_REPLICATE_IGNORE_TABLE = ConfigKeys.newStringConfigKey( + "mysql.slave.replicate_ignore_table", "Don't replicate listed tables"); + ConfigKey<String> SLAVE_REPLICATE_WILD_DO_TABLE = ConfigKeys.newStringConfigKey( + "mysql.slave.replicate_wild_do_table", "Replicate only listed tables, wildcards acepted"); + ConfigKey<String> SLAVE_REPLICATE_WILD_IGNORE_TABLE = ConfigKeys.newStringConfigKey( + "mysql.slave.replicate_wild_ignore_table", "Don't replicate listed tables, wildcards acepted"); + StringAttributeSensorAndConfigKey SLAVE_PASSWORD = new StringAttributeSensorAndConfigKey( + "mysql.slave.password", "The password slaves will use to connect to the master. Will be auto-generated by default."); + @SuppressWarnings("serial") + AttributeSensor<Collection<String>> SLAVE_DATASTORE_URL_LIST = Sensors.newSensor(new TypeToken<Collection<String>>() {}, + "mysql.slave.datastore.url", "List of all slave's DATASTORE_URL sensors"); + AttributeSensor<Double> QUERIES_PER_SECOND_FROM_MYSQL_PER_NODE = Sensors.newDoubleSensor("mysql.queries.perSec.fromMysql.perNode"); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java new file mode 100644 index 0000000..3eaa335 --- /dev/null +++ b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlClusterImpl.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package brooklyn.entity.database.mysql; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; + +import com.google.common.base.Functions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.base.Splitter; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.reflect.TypeToken; + +import brooklyn.config.ConfigKey; +import brooklyn.enricher.Enrichers; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.EntityInternal; +import brooklyn.entity.basic.EntityLocal; +import brooklyn.entity.basic.EntityPredicates; +import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic; +import brooklyn.entity.group.DynamicClusterImpl; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.event.AttributeSensor; +import brooklyn.event.SensorEvent; +import brooklyn.event.SensorEventListener; +import brooklyn.event.basic.DependentConfiguration; +import brooklyn.event.basic.Sensors; +import brooklyn.location.Location; +import brooklyn.util.collections.CollectionFunctionals; +import brooklyn.util.guava.IfFunctions; +import brooklyn.util.task.DynamicTasks; +import brooklyn.util.task.TaskBuilder; +import brooklyn.util.text.Identifiers; + +// https://dev.mysql.com/doc/refman/5.7/en/replication-howto.html + +// TODO CREATION_SCRIPT_CONTENTS executed before replication setup so it is not replicated to slaves +// TODO Bootstrap slave from dump for the case where the binary log is purged +// TODO Promote slave to master +// TODO SSL connection between master and slave +// TODO DB credentials littered all over the place in file system +public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster { + private static final AttributeSensor<Boolean> NODE_REPLICATION_INITIALIZED = Sensors.newBooleanSensor("mysql.replication_initialized"); + + private static final String MASTER_CONFIG_URL = "classpath:///brooklyn/entity/database/mysql/mysql_master.conf"; + private static final String SLAVE_CONFIG_URL = "classpath:///brooklyn/entity/database/mysql/mysql_slave.conf"; + private static final String NOT_UP_REPLICATION = "replication_not_configured"; + private static final int MASTER_SERVER_ID = 1; + private static final Predicate<Entity> IS_MASTER = EntityPredicates.configEqualTo(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID); + + @SuppressWarnings("serial") + private static final AttributeSensor<Supplier<Integer>> SLAVE_NEXT_SERVER_ID = Sensors.newSensor(new TypeToken<Supplier<Integer>>() {}, + "mysql.slave.next_server_id", "Returns the ID of the next slave server"); + @SuppressWarnings("serial") + private static final AttributeSensor<Map<String, String>> SLAVE_ID_ADDRESS_MAPPING = Sensors.newSensor(new TypeToken<Map<String, String>>() {}, + "mysql.slave.id_address_mapping", "Maps slave entity IDs to SUBNET_ADDRESS, so the address is known at member remove time."); + + @Override + public void init() { + super.init(); + // Set id supplier in attribute so it is serialized + setAttribute(SLAVE_NEXT_SERVER_ID, new NextServerIdSupplier()); + setAttribute(SLAVE_ID_ADDRESS_MAPPING, new ConcurrentHashMap<String, String>()); + if (getConfig(SLAVE_PASSWORD) == null) { + setAttribute(SLAVE_PASSWORD, Identifiers.makeRandomId(8)); + } else { + setAttribute(SLAVE_PASSWORD, getConfig(SLAVE_PASSWORD)); + } + initSubscriptions(); + } + + @Override + public void rebind() { + super.rebind(); + initSubscriptions(); + } + + private void initSubscriptions() { + subscribeToMembers(this, MySqlNode.SERVICE_PROCESS_IS_RUNNING, new NodeRunningListener(this)); + subscribe(this, MEMBER_REMOVED, new MemberRemovedListener()); + } + + @Override + protected void initEnrichers() { + super.initEnrichers(); + propagateMasterAttribute(MySqlNode.HOSTNAME); + propagateMasterAttribute(MySqlNode.ADDRESS); + propagateMasterAttribute(MySqlNode.MYSQL_PORT); + propagateMasterAttribute(MySqlNode.DATASTORE_URL); + + addEnricher(Enrichers.builder() + .aggregating(MySqlNode.DATASTORE_URL) + .publishing(SLAVE_DATASTORE_URL_LIST) + .computing(Functions.<Collection<String>>identity()) + .entityFilter(Predicates.not(IS_MASTER)) + .fromMembers() + .build()); + + addEnricher(Enrichers.builder() + .aggregating(MySqlNode.QUERIES_PER_SECOND_FROM_MYSQL) + .publishing(QUERIES_PER_SECOND_FROM_MYSQL_PER_NODE) + .fromMembers() + .computingAverage() + .defaultValueForUnreportedSensors(0d) + .build()); + } + + private void propagateMasterAttribute(AttributeSensor<?> att) { + addEnricher(Enrichers.builder() + .aggregating(att) + .publishing(att) + .computing(IfFunctions.ifPredicate(CollectionFunctionals.notEmpty()) + .apply(CollectionFunctionals.firstElement()) + .defaultValue(null)) + .entityFilter(IS_MASTER) + .build()); + } + + @Override + protected EntitySpec<?> getFirstMemberSpec() { + final EntitySpec<?> firstMemberSpec = super.getFirstMemberSpec(); + if (firstMemberSpec != null) { + return applyDefaults(firstMemberSpec, Suppliers.ofInstance(MASTER_SERVER_ID), MASTER_CONFIG_URL, false); + } + + final EntitySpec<?> memberSpec = super.getMemberSpec(); + if (memberSpec != null) { + if (!isKeyConfigured(memberSpec, MySqlNode.TEMPLATE_CONFIGURATION_URL.getConfigKey())) { + return EntitySpec.create(memberSpec) + .configure(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID) + .configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, MASTER_CONFIG_URL); + } else { + return memberSpec; + } + } + + return EntitySpec.create(MySqlNode.class) + .displayName("MySql Master") + .configure(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID) + .configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, MASTER_CONFIG_URL); + } + + @Override + protected EntitySpec<?> getMemberSpec() { + Supplier<Integer> serverIdSupplier = getAttribute(SLAVE_NEXT_SERVER_ID); + + EntitySpec<?> spec = super.getMemberSpec(); + if (spec != null) { + return applyDefaults(spec, serverIdSupplier, SLAVE_CONFIG_URL, true); + } + + return EntitySpec.create(MySqlNode.class) + .displayName("MySql Slave") + .configure(MySqlNode.MYSQL_SERVER_ID, serverIdSupplier.get()) + .configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, SLAVE_CONFIG_URL) + // block inheritance, only master should execute the creation script + .configure(MySqlNode.CREATION_SCRIPT_URL, (String) null) + .configure(MySqlNode.CREATION_SCRIPT_CONTENTS, (String) null); + } + + private EntitySpec<?> applyDefaults(EntitySpec<?> spec, Supplier<Integer> serverId, String configUrl, boolean resetCreationScript) { + boolean needsServerId = !isKeyConfigured(spec, MySqlNode.MYSQL_SERVER_ID); + boolean needsConfigUrl = !isKeyConfigured(spec, MySqlNode.TEMPLATE_CONFIGURATION_URL.getConfigKey()); + boolean needsCreationScriptUrl = resetCreationScript && !isKeyConfigured(spec, MySqlNode.CREATION_SCRIPT_URL); + boolean needsCreationScriptContents = resetCreationScript && !isKeyConfigured(spec, MySqlNode.CREATION_SCRIPT_CONTENTS); + if (needsServerId || needsConfigUrl || needsCreationScriptUrl || needsCreationScriptContents) { + EntitySpec<?> clonedSpec = EntitySpec.create(spec); + if (needsServerId) { + clonedSpec.configure(MySqlNode.MYSQL_SERVER_ID, serverId.get()); + } + if (needsConfigUrl) { + clonedSpec.configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, configUrl); + } + if (needsCreationScriptUrl) { + clonedSpec.configure(MySqlNode.CREATION_SCRIPT_URL, (String) null); + } + if (needsCreationScriptContents) { + clonedSpec.configure(MySqlNode.CREATION_SCRIPT_URL, (String) null); + } + return clonedSpec; + } else { + return spec; + } + } + + private boolean isKeyConfigured(EntitySpec<?> spec, ConfigKey<?> key) { + return spec.getConfig().containsKey(key) || spec.getFlags().containsKey(key.getName()); + } + + @Override + protected Entity createNode(Location loc, Map<?, ?> flags) { + Entity node = super.createNode(loc, flags); + Integer serverId = node.getConfig(MySqlNode.MYSQL_SERVER_ID); + if (serverId > 0) { + ServiceNotUpLogic.updateNotUpIndicator((EntityLocal)node, NOT_UP_REPLICATION, "Replication not started"); + } + return node; + } + + private static class NextServerIdSupplier implements Supplier<Integer> { + private AtomicInteger nextId = new AtomicInteger(MASTER_SERVER_ID+1); + + @Override + public Integer get() { + return nextId.getAndIncrement(); + } + } + + // ============= Member Init ============= + + // The task is executed in inessential context (event handler) so + // not visible in tasks UI. Better make it visible so the user can + // see failures, currently accessible only from logs. + private static final class InitReplicationTask implements Runnable { + private final MySqlCluster cluster; + private final MySqlNode node; + + private InitReplicationTask(MySqlCluster cluster, MySqlNode node) { + this.cluster = cluster; + this.node = node; + } + + @Override + public void run() { + Integer serverId = node.getConfig(MySqlNode.MYSQL_SERVER_ID); + if (serverId == MASTER_SERVER_ID) { + initMaster(node); + } else if (serverId > MASTER_SERVER_ID) { + initSlave(node); + } + ServiceNotUpLogic.clearNotUpIndicator((EntityLocal)node, NOT_UP_REPLICATION); + } + + private void initMaster(MySqlNode master) { + String binLogInfo = executeScriptOnNode(master, "FLUSH TABLES WITH READ LOCK;SHOW MASTER STATUS \\G UNLOCK TABLES;"); + Iterator<String> splitIter = Splitter.on(Pattern.compile("\\n|:")) + .omitEmptyStrings() + .trimResults() + .split(binLogInfo) + .iterator(); + while (splitIter.hasNext()) { + String part = splitIter.next(); + if (part.equals("File")) { + String file = splitIter.next(); + ((EntityInternal)master).setAttribute(MySqlMaster.MASTER_LOG_FILE, file); + } else if (part.equals("Position")) { + Integer position = new Integer(splitIter.next()); + ((EntityInternal)master).setAttribute(MySqlMaster.MASTER_LOG_POSITION, position); + } + } + } + + private void initSlave(MySqlNode slave) { + MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), IS_MASTER); + String masterLogFile = validateSqlParam(getAttributeBlocking(master, MySqlMaster.MASTER_LOG_FILE)); + Integer masterLogPos = getAttributeBlocking(master, MySqlMaster.MASTER_LOG_POSITION); + String masterAddress = validateSqlParam(master.getAttribute(MySqlNode.SUBNET_ADDRESS)); + Integer masterPort = master.getAttribute(MySqlNode.MYSQL_PORT); + String slaveAddress = validateSqlParam(slave.getAttribute(MySqlNode.SUBNET_ADDRESS)); + String username = validateSqlParam(cluster.getConfig(SLAVE_USERNAME)); + String password = validateSqlParam(cluster.getAttribute(SLAVE_PASSWORD)); + + executeScriptOnNode(master, String.format( + "CREATE USER '%s'@'%s' IDENTIFIED BY '%s';\n" + + "GRANT REPLICATION SLAVE ON *.* TO '%s'@'%s';\n", + username, slaveAddress, password, username, slaveAddress)); + + String slaveCmd = String.format( + "CHANGE MASTER TO " + + "MASTER_HOST='%s', " + + "MASTER_PORT=%d, " + + "MASTER_USER='%s', " + + "MASTER_PASSWORD='%s', " + + "MASTER_LOG_FILE='%s', " + + "MASTER_LOG_POS=%d;\n" + + "START SLAVE;\n", + masterAddress, masterPort, username, password, masterLogFile, masterLogPos); + executeScriptOnNode(slave, slaveCmd); + + cluster.getAttribute(SLAVE_ID_ADDRESS_MAPPING).put(slave.getId(), slave.getAttribute(MySqlNode.SUBNET_ADDRESS)); + } + + private <T> T getAttributeBlocking(Entity masterNode, AttributeSensor<T> att) { + return DynamicTasks.queue(DependentConfiguration.attributeWhenReady(masterNode, att)).getUnchecked(); + } + + } + + private static final class NodeRunningListener implements SensorEventListener<Boolean> { + private MySqlCluster cluster; + + public NodeRunningListener(MySqlCluster cluster) { + this.cluster = cluster; + } + + @Override + public void onEvent(SensorEvent<Boolean> event) { + final MySqlNode node = (MySqlNode) event.getSource(); + if (Boolean.TRUE.equals(event.getValue()) && + // We are interested in SERVICE_PROCESS_IS_RUNNING only while haven't come online yet. + // Probably will get several updates while replication is initialized so an additional + // check is needed whether we have already seen this. + Boolean.FALSE.equals(node.getAttribute(MySqlNode.SERVICE_UP)) && + !Boolean.TRUE.equals(node.getAttribute(NODE_REPLICATION_INITIALIZED))) { + + // Events executed sequentially so no need to synchronize here. + if (Boolean.TRUE.equals(node.getAttribute(NODE_REPLICATION_INITIALIZED))) { + return; + } + ((EntityLocal)node).setAttribute(NODE_REPLICATION_INITIALIZED, Boolean.TRUE); + + DynamicTasks.queueIfPossible(TaskBuilder.builder() + .name("Configure master-slave replication on node") + .body(new InitReplicationTask(cluster, node)) + .build()) + .orSubmitAsync(node); + } + } + + } + + // ============= Member Remove ============= + + public class MemberRemovedListener implements SensorEventListener<Entity> { + @Override + public void onEvent(SensorEvent<Entity> event) { + MySqlCluster cluster = (MySqlCluster) event.getSource(); + Entity node = event.getValue(); + String slaveAddress = cluster.getAttribute(SLAVE_ID_ADDRESS_MAPPING).remove(node.getId()); + if (slaveAddress != null) { + DynamicTasks.queueIfPossible(TaskBuilder.builder() + .name("Remove slave access") + .body(new RemoveSlaveConfigTask(cluster, slaveAddress)) + .build()) + .orSubmitAsync(cluster); + } + } + } + + public class RemoveSlaveConfigTask implements Runnable { + private MySqlCluster cluster; + private String slaveAddress; + + public RemoveSlaveConfigTask(MySqlCluster cluster, String slaveAddress) { + this.cluster = cluster; + this.slaveAddress = validateSqlParam(slaveAddress); + } + + @Override + public void run() { + // Could already be gone if stopping the entire app - let it throw an exception + MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), IS_MASTER); + String username = validateSqlParam(cluster.getConfig(SLAVE_USERNAME)); + executeScriptOnNode(master, String.format("DROP USER '%s'@'%s';", username, slaveAddress)); + } + + } + + // Can't call node.executeScript directly, need to change execution context, so use an effector task + private static String executeScriptOnNode(MySqlNode node, String commands) { + return node.invoke(MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of("commands", commands)).getUnchecked(); + } + + private static String validateSqlParam(String config) { + // Don't go into escape madness, just deny any suspicious strings. + // Would be nice to use prepared statements, but not worth pulling in the extra dependencies. + if (config.contains("'") && config.contains("\\")) { + throw new IllegalStateException("User provided string contains illegal SQL characters: " + config); + } + return config; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNode.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNode.java b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNode.java index 36b4812..a5c44e0 100644 --- a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNode.java +++ b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNode.java @@ -65,6 +65,9 @@ public interface MySqlNode extends SoftwareProcess, HasShortName, DatastoreCommo public static final ConfigKey<Object> MYSQL_SERVER_CONF_LOWER_CASE_TABLE_NAMES = MYSQL_SERVER_CONF.subKey("lower_case_table_names", "See MySQL guide. Set 1 to ignore case in table names (useful for OS portability)"); + @SetFromFlag("serverId") + public static final ConfigKey<Integer> MYSQL_SERVER_ID = ConfigKeys.newIntegerConfigKey("mysql.server_id", "Corresponds to server_id option", 0); + @SetFromFlag("password") public static final StringAttributeSensorAndConfigKey PASSWORD = new StringAttributeSensorAndConfigKey( "mysql.password", "Database admin password (or randomly generated if not set)", null); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNodeImpl.java b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNodeImpl.java index 4f8606a..2e05fb0 100644 --- a/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNodeImpl.java +++ b/software/database/src/main/java/brooklyn/entity/database/mysql/MySqlNodeImpl.java @@ -81,7 +81,7 @@ public class MySqlNodeImpl extends SoftwareProcessImpl implements MySqlNode { } }); } - + @Override protected void connectSensors() { super.connectSensors(); @@ -102,6 +102,7 @@ public class MySqlNodeImpl extends SoftwareProcessImpl implements MySqlNode { .poll(new SshPollConfig<Double>(QUERIES_PER_SECOND_FROM_MYSQL) .command(cmd) .onSuccess(new Function<SshPollValue, Double>() { + @Override public Double apply(SshPollValue input) { String q = Strings.getFirstWordAfter(input.getStdout(), "Queries per second avg:"); if (q==null) return null; @@ -151,7 +152,7 @@ public class MySqlNodeImpl extends SoftwareProcessImpl implements MySqlNode { public String getShortName() { return "MySQL"; } - + @Override public String executeScript(String commands) { return getDriver().executeScriptAsync(commands).block().getStdout(); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_master.conf ---------------------------------------------------------------------- diff --git a/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_master.conf b/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_master.conf new file mode 100644 index 0000000..791f2da --- /dev/null +++ b/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_master.conf @@ -0,0 +1,26 @@ +[client] +port = ${driver.port?c} +socket = /tmp/mysql.sock.${entity.socketUid}.${driver.port?c} +user = root +password = ${entity.password} + +# Here follows entries for some specific programs + +# The MySQL server +[mysqld] +port = ${driver.port?c} +socket = /tmp/mysql.sock.${entity.socketUid}.${driver.port?c} +basedir = ${driver.baseDir} +datadir = ${driver.dataDir} +bind-address = 0.0.0.0 +# skip-networking + +# Replication config +server-id = 1 +binlog-format = mixed +log-bin = mysql-bin +sync_binlog = 1 +innodb_flush_log_at_trx_commit=1 + +# Custom configuration options +${driver.mySqlServerOptionsString} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_slave.conf ---------------------------------------------------------------------- diff --git a/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_slave.conf b/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_slave.conf new file mode 100644 index 0000000..2e1e945 --- /dev/null +++ b/software/database/src/main/resources/brooklyn/entity/database/mysql/mysql_slave.conf @@ -0,0 +1,33 @@ +[#ftl] +[client] +port = ${driver.port?c} +socket = /tmp/mysql.sock.${entity.socketUid}.${driver.port?c} +user = root +password = ${entity.password} + +# Here follows entries for some specific programs + +# The MySQL server +[mysqld] +port = ${driver.port?c} +socket = /tmp/mysql.sock.${entity.socketUid}.${driver.port?c} +basedir = ${driver.baseDir} +datadir = ${driver.dataDir} +bind-address = 0.0.0.0 +# skip-networking + +# Replication config +server-id = ${config["mysql.server_id"]} +relay-log = mysql-slave-${config["mysql.server_id"]}-relay +relay-log-recovery = 1 +relay-log-info-repository = TABLE +relay-log-purge = 1 +[#if !config["mysql.slave.replicate_do_db"]?? ]#[/#if]replicate-do-db = ${config["mysql.slave.replicate_do_db"]!} +[#if !config["mysql.slave.replicate_ignore_db"]?? ]#[/#if]replicate-ignore-db = ${config["mysql.slave.replicate_ignore_db"]!} +[#if !config["mysql.slave.replicate_do_table"]?? ]#[/#if]replicate-do-table = ${config["mysql.slave.replicate_do_table"]!} +[#if !config["mysql.slave.replicate_ignore_table"]?? ]#[/#if]replicate-ignore-table = ${config["mysql.slave.replicate_ignore_table"]!} +[#if !config["mysql.slave.replicate_wild_do_table"]?? ]#[/#if]replicate-wild-do-table = ${config["mysql.slave.replicate_wild_do_table"]!} +[#if !config["mysql.slave.replicate_wild_ignore_table"]??]#[/#if]replicate-wild-ignore-table = ${config["mysql.slave.replicate_wild_ignore_table"]!} + +# Custom configuration options +${driver.mySqlServerOptionsString} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bae9628b/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java ---------------------------------------------------------------------- diff --git a/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java b/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java index 4208fe3..2d08bd2 100644 --- a/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java +++ b/utils/common/src/main/java/brooklyn/util/collections/CollectionFunctionals.java @@ -148,6 +148,14 @@ public class CollectionFunctionals { return Predicates.compose(Predicates.equalTo(targetSize), CollectionFunctionals.sizeFunction()); } + public static Predicate<Iterable<?>> empty() { + return sizeEquals(0); + } + + public static Predicate<Iterable<?>> notEmpty() { + return Predicates.not(empty()); + } + public static <K> Predicate<Map<K,?>> mapSizeEquals(int targetSize) { return Predicates.compose(Predicates.equalTo(targetSize), CollectionFunctionals.<K>mapSize()); }
