http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1a7c09/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java new file mode 100644 index 0000000..3dc26cc --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlCluster.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.mysql; + +import java.util.Collection; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.api.event.AttributeSensor; + +import com.google.common.reflect.TypeToken; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import org.apache.brooklyn.entity.database.DatastoreMixins.HasDatastoreUrl; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; + +@ImplementedBy(MySqlClusterImpl.class) +@Catalog(name="MySql Master-Slave cluster", description="Sets up a cluster of MySQL nodes using master-slave relation and binary logging", iconUrl="classpath:///mysql-logo-110x57.png") +public interface MySqlCluster extends DynamicCluster, HasDatastoreUrl { + interface MySqlMaster { + AttributeSensor<String> MASTER_LOG_FILE = Sensors.newStringSensor("mysql.master.log_file", "The binary log file master is writing to"); + AttributeSensor<Integer> MASTER_LOG_POSITION = Sensors.newIntegerSensor("mysql.master.log_position", "The position in the log file to start replication"); + } + interface MySqlSlave { + AttributeSensor<Boolean> SLAVE_HEALTHY = Sensors.newBooleanSensor("mysql.slave.healthy", "Indicates that the replication state of the slave is healthy"); + AttributeSensor<Integer> SLAVE_SECONDS_BEHIND_MASTER = Sensors.newIntegerSensor("mysql.slave.seconds_behind_master", "How many seconds behind master is the replication state on the slave"); + } + + ConfigKey<String> SLAVE_USERNAME = ConfigKeys.newStringConfigKey( + "mysql.slave.username", "The user name slaves will use to connect to the master", "slave"); + ConfigKey<String> SLAVE_REPLICATE_DO_DB = ConfigKeys.newStringConfigKey( + "mysql.slave.replicate_do_db", "Replicate only listed DBs"); + ConfigKey<String> SLAVE_REPLICATE_IGNORE_DB = ConfigKeys.newStringConfigKey( + "mysql.slave.replicate_ignore_db", "Don't replicate listed DBs"); + ConfigKey<String> SLAVE_REPLICATE_DO_TABLE = ConfigKeys.newStringConfigKey( + "mysql.slave.replicate_do_table", "Replicate only listed tables"); + ConfigKey<String> SLAVE_REPLICATE_IGNORE_TABLE = ConfigKeys.newStringConfigKey( + "mysql.slave.replicate_ignore_table", "Don't replicate listed tables"); + ConfigKey<String> SLAVE_REPLICATE_WILD_DO_TABLE = ConfigKeys.newStringConfigKey( + "mysql.slave.replicate_wild_do_table", "Replicate only listed tables, wildcards acepted"); + ConfigKey<String> SLAVE_REPLICATE_WILD_IGNORE_TABLE = ConfigKeys.newStringConfigKey( + "mysql.slave.replicate_wild_ignore_table", "Don't replicate listed tables, wildcards acepted"); + StringAttributeSensorAndConfigKey SLAVE_PASSWORD = new StringAttributeSensorAndConfigKey( + "mysql.slave.password", "The password slaves will use to connect to the master. Will be auto-generated by default."); + @SuppressWarnings("serial") + AttributeSensor<Collection<String>> SLAVE_DATASTORE_URL_LIST = Sensors.newSensor(new TypeToken<Collection<String>>() {}, + "mysql.slave.datastore.url", "List of all slave's DATASTORE_URL sensors"); + AttributeSensor<Double> QUERIES_PER_SECOND_FROM_MYSQL_PER_NODE = Sensors.newDoubleSensor("mysql.queries.perSec.fromMysql.perNode"); +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1a7c09/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java new file mode 100644 index 0000000..5bf6e88 --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlClusterImpl.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.mysql; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.api.event.AttributeSensor; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.core.util.task.DynamicTasks; +import org.apache.brooklyn.core.util.task.TaskBuilder; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.reflect.TypeToken; + +import brooklyn.config.ConfigKey; +import brooklyn.enricher.Enrichers; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.EntityInternal; +import brooklyn.entity.basic.EntityPredicates; +import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic; +import brooklyn.entity.group.DynamicClusterImpl; +import brooklyn.event.basic.DependentConfiguration; +import brooklyn.event.basic.Sensors; +import brooklyn.event.feed.function.FunctionFeed; +import brooklyn.event.feed.function.FunctionPollConfig; +import brooklyn.util.collections.CollectionFunctionals; +import brooklyn.util.guava.Functionals; +import brooklyn.util.guava.IfFunctions; +import brooklyn.util.text.Identifiers; +import brooklyn.util.text.StringPredicates; +import brooklyn.util.time.Duration; + +// https://dev.mysql.com/doc/refman/5.7/en/replication-howto.html + +// TODO CREATION_SCRIPT_CONTENTS executed before replication setup so it is not replicated to slaves +// TODO Bootstrap slave from dump for the case where the binary log is purged +// TODO Promote slave to master +// TODO SSL connection between master and slave +// TODO DB credentials littered all over the place in file system +public class MySqlClusterImpl extends DynamicClusterImpl implements MySqlCluster { + private static final AttributeSensor<Boolean> NODE_REPLICATION_INITIALIZED = Sensors.newBooleanSensor("mysql.replication_initialized"); + + private static final String MASTER_CONFIG_URL = "classpath:///org/apache/brooklyn/entity/database/mysql/mysql_master.conf"; + private static final String SLAVE_CONFIG_URL = "classpath:///org/apache/brooklyn/entity/database/mysql/mysql_slave.conf"; + private static final int MASTER_SERVER_ID = 1; + private static final Predicate<Entity> IS_MASTER = EntityPredicates.configEqualTo(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID); + + @SuppressWarnings("serial") + private static final AttributeSensor<Supplier<Integer>> SLAVE_NEXT_SERVER_ID = Sensors.newSensor(new TypeToken<Supplier<Integer>>() {}, + "mysql.slave.next_server_id", "Returns the ID of the next slave server"); + @SuppressWarnings("serial") + private static final AttributeSensor<Map<String, String>> SLAVE_ID_ADDRESS_MAPPING = Sensors.newSensor(new TypeToken<Map<String, String>>() {}, + "mysql.slave.id_address_mapping", "Maps slave entity IDs to SUBNET_ADDRESS, so the address is known at member remove time."); + + @Override + public void init() { + super.init(); + // Set id supplier in attribute so it is serialized + setAttribute(SLAVE_NEXT_SERVER_ID, new NextServerIdSupplier()); + setAttribute(SLAVE_ID_ADDRESS_MAPPING, new ConcurrentHashMap<String, String>()); + if (getConfig(SLAVE_PASSWORD) == null) { + setAttribute(SLAVE_PASSWORD, Identifiers.makeRandomId(8)); + } else { + setAttribute(SLAVE_PASSWORD, getConfig(SLAVE_PASSWORD)); + } + initSubscriptions(); + } + + @Override + public void rebind() { + super.rebind(); + initSubscriptions(); + } + + private void initSubscriptions() { + subscribeToMembers(this, MySqlNode.SERVICE_PROCESS_IS_RUNNING, new NodeRunningListener(this)); + subscribe(this, MEMBER_REMOVED, new MemberRemovedListener()); + } + + @Override + protected void initEnrichers() { + super.initEnrichers(); + propagateMasterAttribute(MySqlNode.HOSTNAME); + propagateMasterAttribute(MySqlNode.ADDRESS); + propagateMasterAttribute(MySqlNode.SUBNET_HOSTNAME); + propagateMasterAttribute(MySqlNode.SUBNET_ADDRESS); + propagateMasterAttribute(MySqlNode.MYSQL_PORT); + propagateMasterAttribute(MySqlNode.DATASTORE_URL); + + addEnricher(Enrichers.builder() + .aggregating(MySqlNode.DATASTORE_URL) + .publishing(SLAVE_DATASTORE_URL_LIST) + .computing(Functions.<Collection<String>>identity()) + .entityFilter(Predicates.not(IS_MASTER)) + .fromMembers() + .build()); + + addEnricher(Enrichers.builder() + .aggregating(MySqlNode.QUERIES_PER_SECOND_FROM_MYSQL) + .publishing(QUERIES_PER_SECOND_FROM_MYSQL_PER_NODE) + .fromMembers() + .computingAverage() + .defaultValueForUnreportedSensors(0d) + .build()); + } + + private void propagateMasterAttribute(AttributeSensor<?> att) { + addEnricher(Enrichers.builder() + .aggregating(att) + .publishing(att) + .computing(IfFunctions.ifPredicate(CollectionFunctionals.notEmpty()) + .apply(CollectionFunctionals.firstElement()) + .defaultValue(null)) + .entityFilter(IS_MASTER) + .build()); + } + + @Override + protected EntitySpec<?> getFirstMemberSpec() { + final EntitySpec<?> firstMemberSpec = super.getFirstMemberSpec(); + if (firstMemberSpec != null) { + return applyDefaults(firstMemberSpec, Suppliers.ofInstance(MASTER_SERVER_ID), MASTER_CONFIG_URL, false); + } + + final EntitySpec<?> memberSpec = super.getMemberSpec(); + if (memberSpec != null) { + if (!isKeyConfigured(memberSpec, MySqlNode.TEMPLATE_CONFIGURATION_URL.getConfigKey())) { + return EntitySpec.create(memberSpec) + .configure(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID) + .configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, MASTER_CONFIG_URL); + } else { + return memberSpec; + } + } + + return EntitySpec.create(MySqlNode.class) + .displayName("MySql Master") + .configure(MySqlNode.MYSQL_SERVER_ID, MASTER_SERVER_ID) + .configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, MASTER_CONFIG_URL); + } + + @Override + protected EntitySpec<?> getMemberSpec() { + Supplier<Integer> serverIdSupplier = getAttribute(SLAVE_NEXT_SERVER_ID); + + EntitySpec<?> spec = super.getMemberSpec(); + if (spec != null) { + return applyDefaults(spec, serverIdSupplier, SLAVE_CONFIG_URL, true); + } + + return EntitySpec.create(MySqlNode.class) + .displayName("MySql Slave") + .configure(MySqlNode.MYSQL_SERVER_ID, serverIdSupplier.get()) + .configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, SLAVE_CONFIG_URL) + // block inheritance, only master should execute the creation script + .configure(MySqlNode.CREATION_SCRIPT_URL, (String) null) + .configure(MySqlNode.CREATION_SCRIPT_CONTENTS, (String) null); + } + + private EntitySpec<?> applyDefaults(EntitySpec<?> spec, Supplier<Integer> serverId, String configUrl, boolean resetCreationScript) { + boolean needsServerId = !isKeyConfigured(spec, MySqlNode.MYSQL_SERVER_ID); + boolean needsConfigUrl = !isKeyConfigured(spec, MySqlNode.TEMPLATE_CONFIGURATION_URL.getConfigKey()); + boolean needsCreationScriptUrl = resetCreationScript && !isKeyConfigured(spec, MySqlNode.CREATION_SCRIPT_URL); + boolean needsCreationScriptContents = resetCreationScript && !isKeyConfigured(spec, MySqlNode.CREATION_SCRIPT_CONTENTS); + if (needsServerId || needsConfigUrl || needsCreationScriptUrl || needsCreationScriptContents) { + EntitySpec<?> clonedSpec = EntitySpec.create(spec); + if (needsServerId) { + clonedSpec.configure(MySqlNode.MYSQL_SERVER_ID, serverId.get()); + } + if (needsConfigUrl) { + clonedSpec.configure(MySqlNode.TEMPLATE_CONFIGURATION_URL, configUrl); + } + if (needsCreationScriptUrl) { + clonedSpec.configure(MySqlNode.CREATION_SCRIPT_URL, (String) null); + } + if (needsCreationScriptContents) { + clonedSpec.configure(MySqlNode.CREATION_SCRIPT_URL, (String) null); + } + return clonedSpec; + } else { + return spec; + } + } + + private boolean isKeyConfigured(EntitySpec<?> spec, ConfigKey<?> key) { + return spec.getConfig().containsKey(key) || spec.getFlags().containsKey(key.getName()); + } + + @Override + protected Entity createNode(Location loc, Map<?, ?> flags) { + Entity node = super.createNode(loc, flags); + if (!IS_MASTER.apply(node)) { + ServiceNotUpLogic.updateNotUpIndicator((EntityLocal)node, MySqlSlave.SLAVE_HEALTHY, "Replication not started"); + + addFeed(FunctionFeed.builder() + .entity((EntityLocal)node) + .period(Duration.FIVE_SECONDS) + .poll(FunctionPollConfig.forSensor(MySqlSlave.SLAVE_HEALTHY) + .callable(new SlaveStateCallable(node)) + .checkSuccess(StringPredicates.isNonBlank()) + .onSuccess(new SlaveStateParser(node)) + .setOnFailure(false) + .description("Polls SHOW SLAVE STATUS")) + .build()); + + node.addEnricher(Enrichers.builder().updatingMap(Attributes.SERVICE_NOT_UP_INDICATORS) + .from(MySqlSlave.SLAVE_HEALTHY) + .computing(Functionals.ifNotEquals(true).value("Slave replication status is not healthy") ) + .build()); + } + return node; + } + + public static class SlaveStateCallable implements Callable<String> { + private Entity slave; + public SlaveStateCallable(Entity slave) { + this.slave = slave; + } + + @Override + public String call() throws Exception { + if (Boolean.TRUE.equals(slave.getAttribute(MySqlNode.SERVICE_PROCESS_IS_RUNNING))) { + return slave.invoke(MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of("commands", "SHOW SLAVE STATUS \\G")).asTask().getUnchecked(); + } else { + return null; + } + } + + } + + public static class SlaveStateParser implements Function<String, Boolean> { + private Entity slave; + + public SlaveStateParser(Entity slave) { + this.slave = slave; + } + + @Override + public Boolean apply(String result) { + Map<String, String> status = MySqlRowParser.parseSingle(result); + String secondsBehindMaster = status.get("Seconds_Behind_Master"); + if (secondsBehindMaster != null && !"NULL".equals(secondsBehindMaster)) { + ((EntityLocal)slave).setAttribute(MySqlSlave.SLAVE_SECONDS_BEHIND_MASTER, new Integer(secondsBehindMaster)); + } + return "Yes".equals(status.get("Slave_IO_Running")) && "Yes".equals(status.get("Slave_SQL_Running")); + } + + } + + private static class NextServerIdSupplier implements Supplier<Integer> { + private AtomicInteger nextId = new AtomicInteger(MASTER_SERVER_ID+1); + + @Override + public Integer get() { + return nextId.getAndIncrement(); + } + } + + // ============= Member Init ============= + + // The task is executed in inessential context (event handler) so + // not visible in tasks UI. Better make it visible so the user can + // see failures, currently accessible only from logs. + private static final class InitReplicationTask implements Runnable { + private final MySqlCluster cluster; + private final MySqlNode node; + + private InitReplicationTask(MySqlCluster cluster, MySqlNode node) { + this.cluster = cluster; + this.node = node; + } + + @Override + public void run() { + Integer serverId = node.getConfig(MySqlNode.MYSQL_SERVER_ID); + if (serverId == MASTER_SERVER_ID) { + initMaster(node); + } else if (serverId > MASTER_SERVER_ID) { + initSlave(node); + } + } + + private void initMaster(MySqlNode master) { + String binLogInfo = executeScriptOnNode(master, "FLUSH TABLES WITH READ LOCK;SHOW MASTER STATUS \\G UNLOCK TABLES;"); + Map<String, String> status = MySqlRowParser.parseSingle(binLogInfo); + String file = status.get("File"); + if (file != null) { + ((EntityInternal)master).setAttribute(MySqlMaster.MASTER_LOG_FILE, file); + } + String position = status.get("Position"); + if (position != null) { + ((EntityInternal)master).setAttribute(MySqlMaster.MASTER_LOG_POSITION, new Integer(position)); + } + } + + private void initSlave(MySqlNode slave) { + MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), IS_MASTER); + String masterLogFile = validateSqlParam(getAttributeBlocking(master, MySqlMaster.MASTER_LOG_FILE)); + Integer masterLogPos = getAttributeBlocking(master, MySqlMaster.MASTER_LOG_POSITION); + String masterAddress = validateSqlParam(master.getAttribute(MySqlNode.SUBNET_ADDRESS)); + Integer masterPort = master.getAttribute(MySqlNode.MYSQL_PORT); + String slaveAddress = validateSqlParam(slave.getAttribute(MySqlNode.SUBNET_ADDRESS)); + String username = validateSqlParam(cluster.getConfig(SLAVE_USERNAME)); + String password = validateSqlParam(cluster.getAttribute(SLAVE_PASSWORD)); + + executeScriptOnNode(master, String.format( + "CREATE USER '%s'@'%s' IDENTIFIED BY '%s';\n" + + "GRANT REPLICATION SLAVE ON *.* TO '%s'@'%s';\n", + username, slaveAddress, password, username, slaveAddress)); + + String slaveCmd = String.format( + "CHANGE MASTER TO " + + "MASTER_HOST='%s', " + + "MASTER_PORT=%d, " + + "MASTER_USER='%s', " + + "MASTER_PASSWORD='%s', " + + "MASTER_LOG_FILE='%s', " + + "MASTER_LOG_POS=%d;\n" + + "START SLAVE;\n", + masterAddress, masterPort, username, password, masterLogFile, masterLogPos); + executeScriptOnNode(slave, slaveCmd); + + cluster.getAttribute(SLAVE_ID_ADDRESS_MAPPING).put(slave.getId(), slave.getAttribute(MySqlNode.SUBNET_ADDRESS)); + } + + private <T> T getAttributeBlocking(Entity masterNode, AttributeSensor<T> att) { + return DynamicTasks.queue(DependentConfiguration.attributeWhenReady(masterNode, att)).getUnchecked(); + } + + } + + private static final class NodeRunningListener implements SensorEventListener<Boolean> { + private MySqlCluster cluster; + + public NodeRunningListener(MySqlCluster cluster) { + this.cluster = cluster; + } + + @Override + public void onEvent(SensorEvent<Boolean> event) { + final MySqlNode node = (MySqlNode) event.getSource(); + if (Boolean.TRUE.equals(event.getValue()) && + // We are interested in SERVICE_PROCESS_IS_RUNNING only while haven't come online yet. + // Probably will get several updates while replication is initialized so an additional + // check is needed whether we have already seen this. + Boolean.FALSE.equals(node.getAttribute(MySqlNode.SERVICE_UP)) && + !Boolean.TRUE.equals(node.getAttribute(NODE_REPLICATION_INITIALIZED))) { + + // Events executed sequentially so no need to synchronize here. + ((EntityLocal)node).setAttribute(NODE_REPLICATION_INITIALIZED, Boolean.TRUE); + + DynamicTasks.queueIfPossible(TaskBuilder.builder() + .name("Configure master-slave replication on node") + .body(new InitReplicationTask(cluster, node)) + .build()) + .orSubmitAsync(node); + } + } + + } + + // ============= Member Remove ============= + + public class MemberRemovedListener implements SensorEventListener<Entity> { + @Override + public void onEvent(SensorEvent<Entity> event) { + MySqlCluster cluster = (MySqlCluster) event.getSource(); + Entity node = event.getValue(); + String slaveAddress = cluster.getAttribute(SLAVE_ID_ADDRESS_MAPPING).remove(node.getId()); + if (slaveAddress != null) { + DynamicTasks.queueIfPossible(TaskBuilder.builder() + .name("Remove slave access") + .body(new RemoveSlaveConfigTask(cluster, slaveAddress)) + .build()) + .orSubmitAsync(cluster); + } + } + } + + public class RemoveSlaveConfigTask implements Runnable { + private MySqlCluster cluster; + private String slaveAddress; + + public RemoveSlaveConfigTask(MySqlCluster cluster, String slaveAddress) { + this.cluster = cluster; + this.slaveAddress = validateSqlParam(slaveAddress); + } + + @Override + public void run() { + // Could already be gone if stopping the entire app - let it throw an exception + MySqlNode master = (MySqlNode) Iterables.find(cluster.getMembers(), IS_MASTER); + String username = validateSqlParam(cluster.getConfig(SLAVE_USERNAME)); + executeScriptOnNode(master, String.format("DROP USER '%s'@'%s';", username, slaveAddress)); + } + + } + + // Can't call node.executeScript directly, need to change execution context, so use an effector task + private static String executeScriptOnNode(MySqlNode node, String commands) { + return node.invoke(MySqlNode.EXECUTE_SCRIPT, ImmutableMap.of(MySqlNode.EXECUTE_SCRIPT_COMMANDS, commands)).getUnchecked(); + } + + private static String validateSqlParam(String config) { + // Don't go into escape madness, just deny any suspicious strings. + // Would be nice to use prepared statements, but not worth pulling in the extra dependencies. + if (config.contains("'") && config.contains("\\")) { + throw new IllegalStateException("User provided string contains illegal SQL characters: " + config); + } + return config; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1a7c09/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java new file mode 100644 index 0000000..dd63ae1 --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlDriver.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.mysql; + +import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper; + +import brooklyn.entity.basic.SoftwareProcessDriver; + +/** + * The {@link SoftwareProcessDriver} for MySQL. + */ +public interface MySqlDriver extends SoftwareProcessDriver { + public String getStatusCmd(); + public ProcessTaskWrapper<Integer> executeScriptAsync(String commands); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1a7c09/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java new file mode 100644 index 0000000..226d8fb --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNode.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.mysql; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.api.entity.trait.HasShortName; +import org.apache.brooklyn.api.event.AttributeSensor; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.annotation.Effector; +import brooklyn.entity.annotation.EffectorParam; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.MethodEffector; +import brooklyn.entity.basic.SoftwareProcess; +import org.apache.brooklyn.entity.database.DatastoreMixins.DatastoreCommon; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey; +import brooklyn.event.basic.MapConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; + +import org.apache.brooklyn.location.basic.PortRanges; + +@Catalog(name="MySql Node", description="MySql is an open source relational database management system (RDBMS)", iconUrl="classpath:///mysql-logo-110x57.png") +@ImplementedBy(MySqlNodeImpl.class) +public interface MySqlNode extends SoftwareProcess, HasShortName, DatastoreCommon { + + // NOTE MySQL changes the minor version number of their GA release frequently, check for latest version if install fails + @SetFromFlag("version") + ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "5.6.26"); + + //http://dev.mysql.com/get/Downloads/MySQL-5.6/mysql-5.6.26-osx10.9-x86_64.tar.gz + //http://dev.mysql.com/get/Downloads/MySQL-5.6/mysql-5.6.26-linux-glibc2.5-x86_64.tar.gz + @SetFromFlag("downloadUrl") + BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new StringAttributeSensorAndConfigKey( + Attributes.DOWNLOAD_URL, "http://dev.mysql.com/get/Downloads/MySQL-5.6/mysql-${version}-${driver.osTag}.tar.gz"); + + @SetFromFlag("port") + PortAttributeSensorAndConfigKey MYSQL_PORT = new PortAttributeSensorAndConfigKey("mysql.port", "MySQL port", PortRanges.fromString("3306, 13306+")); + + @SetFromFlag("dataDir") + ConfigKey<String> DATA_DIR = ConfigKeys.newStringConfigKey( + "mysql.datadir", "Directory for writing data files", null); + + @SetFromFlag("serverConf") + MapConfigKey<Object> MYSQL_SERVER_CONF = new MapConfigKey<Object>( + Object.class, "mysql.server.conf", "Configuration options for mysqld"); + + ConfigKey<Object> MYSQL_SERVER_CONF_LOWER_CASE_TABLE_NAMES = MYSQL_SERVER_CONF.subKey("lower_case_table_names", "See MySQL guide. Set 1 to ignore case in table names (useful for OS portability)"); + + @SetFromFlag("serverId") + ConfigKey<Integer> MYSQL_SERVER_ID = ConfigKeys.newIntegerConfigKey("mysql.server_id", "Corresponds to server_id option", 0); + + @SetFromFlag("password") + StringAttributeSensorAndConfigKey PASSWORD = new StringAttributeSensorAndConfigKey( + "mysql.password", "Database admin password (or randomly generated if not set)", null); + + @SetFromFlag("socketUid") + StringAttributeSensorAndConfigKey SOCKET_UID = new StringAttributeSensorAndConfigKey( + "mysql.socketUid", "Socket uid, for use in file /tmp/mysql.sock.<uid>.3306 (or randomly generated if not set)", null); + + /** @deprecated since 0.7.0 use DATASTORE_URL */ @Deprecated + AttributeSensor<String> MYSQL_URL = DATASTORE_URL; + + @SetFromFlag("configurationTemplateUrl") + BasicAttributeSensorAndConfigKey<String> TEMPLATE_CONFIGURATION_URL = new StringAttributeSensorAndConfigKey( + "mysql.template.configuration.url", "Template file (in freemarker format) for the mysql.conf file", + "classpath://org/apache/brooklyn/entity/database/mysql/mysql.conf"); + + AttributeSensor<Double> QUERIES_PER_SECOND_FROM_MYSQL = Sensors.newDoubleSensor("mysql.queries.perSec.fromMysql"); + + MethodEffector<String> EXECUTE_SCRIPT = new MethodEffector<String>(MySqlNode.class, "executeScript"); + String EXECUTE_SCRIPT_COMMANDS = "commands"; + + @Effector(description = "Execute SQL script on the node as the root user") + String executeScript(@EffectorParam(name=EXECUTE_SCRIPT_COMMANDS) String commands); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1a7c09/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java new file mode 100644 index 0000000..075db69 --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlNodeImpl.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.mysql; + +import java.util.Map; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.core.util.config.ConfigBag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.entity.effector.EffectorBody; +import brooklyn.event.feed.ssh.SshFeed; +import brooklyn.event.feed.ssh.SshPollConfig; +import brooklyn.event.feed.ssh.SshPollValue; + +import org.apache.brooklyn.location.basic.Locations; +import org.apache.brooklyn.location.basic.SshMachineLocation; + +import brooklyn.util.collections.MutableMap; +import brooklyn.util.guava.Maybe; +import brooklyn.util.text.Identifiers; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; + +import com.google.common.base.Function; + +public class MySqlNodeImpl extends SoftwareProcessImpl implements MySqlNode { + + private static final Logger LOG = LoggerFactory.getLogger(MySqlNodeImpl.class); + + private SshFeed feed; + + public MySqlNodeImpl() { + } + + public MySqlNodeImpl(Entity parent) { + this(MutableMap.of(), parent); + } + + public MySqlNodeImpl(Map<?,?> flags) { + super(flags, null); + } + + public MySqlNodeImpl(Map<?,?> flags, Entity parent) { + super(flags, parent); + } + + @Override + public Class<?> getDriverInterface() { + return MySqlDriver.class; + } + + @Override + public MySqlDriver getDriver() { + return (MySqlDriver) super.getDriver(); + } + + @Override + public void init() { + super.init(); + getMutableEntityType().addEffector(EXECUTE_SCRIPT, new EffectorBody<String>() { + @Override + public String call(ConfigBag parameters) { + return executeScript((String)parameters.getStringKey("commands")); + } + }); + } + + @Override + protected void connectSensors() { + super.connectSensors(); + setAttribute(DATASTORE_URL, String.format("mysql://%s:%s/", getAttribute(HOSTNAME), getAttribute(MYSQL_PORT))); + + /* + * TODO status gives us things like: + * Uptime: 2427 Threads: 1 Questions: 581 Slow queries: 0 Opens: 53 Flush tables: 1 Open tables: 35 Queries per second avg: 0.239 + * So can extract lots of sensors from that. + */ + Maybe<SshMachineLocation> machine = Locations.findUniqueSshMachineLocation(getLocations()); + boolean retrieveUsageMetrics = getConfig(RETRIEVE_USAGE_METRICS); + + if (machine.isPresent()) { + String cmd = getDriver().getStatusCmd(); + feed = SshFeed.builder() + .entity(this) + .period(Duration.FIVE_SECONDS) + .machine(machine.get()) + .poll(new SshPollConfig<Double>(QUERIES_PER_SECOND_FROM_MYSQL) + .command(cmd) + .onSuccess(new Function<SshPollValue, Double>() { + @Override + public Double apply(SshPollValue input) { + String q = Strings.getFirstWordAfter(input.getStdout(), "Queries per second avg:"); + if (q==null) return null; + return Double.parseDouble(q); + }}) + .setOnFailureOrException(null) + .enabled(retrieveUsageMetrics)) + .poll(new SshPollConfig<Boolean>(SERVICE_PROCESS_IS_RUNNING) + .command(cmd) + .setOnSuccess(true) + .setOnFailureOrException(false) + .suppressDuplicates(true)) + .build(); + } else { + LOG.warn("Location(s) {} not an ssh-machine location, so not polling for status; setting serviceUp immediately", getLocations()); + setAttribute(SERVICE_UP, true); + } + } + + @Override + protected void disconnectSensors() { + if (feed != null) feed.stop(); + super.disconnectSensors(); + } + + public int getPort() { + return getAttribute(MYSQL_PORT); + } + + public String getSocketUid() { + String result = getAttribute(MySqlNode.SOCKET_UID); + if (Strings.isBlank(result)) { + result = Identifiers.makeRandomId(6); + setAttribute(MySqlNode.SOCKET_UID, result); + } + return result; + } + + public String getPassword() { + String result = getAttribute(MySqlNode.PASSWORD); + if (Strings.isBlank(result)) { + result = Identifiers.makeRandomId(6); + setAttribute(MySqlNode.PASSWORD, result); + } + return result; + } + + @Override + public String getShortName() { + return "MySQL"; + } + + @Override + public String executeScript(String commands) { + return getDriver().executeScriptAsync(commands).block().getStdout(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1a7c09/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlRowParser.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlRowParser.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlRowParser.java new file mode 100644 index 0000000..2ae12b5 --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlRowParser.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.mysql; + +import java.util.Map; + +import brooklyn.util.collections.MutableMap; +import brooklyn.util.text.Strings; + +public class MySqlRowParser { + public static Map<String, String> parseSingle(String row) { + Map<String, String> values = MutableMap.of(); + String[] lines = row.split("\\n"); + for (String line : lines) { + if (line.startsWith("*")) continue; // row delimiter + String[] arr = line.split(":", 2); + String key = arr[0].trim(); + String value = Strings.emptyToNull(arr[1].trim()); + values.put(key, value); + } + return values; + }; +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1a7c09/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java new file mode 100644 index 0000000..eef77cd --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/MySqlSshDriver.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.mysql; + +import static brooklyn.util.JavaGroovyEquivalents.groovyTruth; +import static brooklyn.util.ssh.BashCommands.commandsToDownloadUrlsAs; +import static brooklyn.util.ssh.BashCommands.installPackage; +import static java.lang.String.format; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.Entities; +import org.apache.brooklyn.entity.database.DatastoreMixins; +import brooklyn.entity.software.SshEffectorTasks; + +import org.apache.brooklyn.api.location.OsDetails; +import org.apache.brooklyn.core.util.task.DynamicTasks; +import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper; +import org.apache.brooklyn.location.basic.BasicOsDetails.OsVersions; +import org.apache.brooklyn.location.basic.SshMachineLocation; + +import brooklyn.util.collections.MutableMap; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.io.FileUtil; +import brooklyn.util.net.Urls; +import brooklyn.util.os.Os; +import brooklyn.util.ssh.BashCommands; +import brooklyn.util.stream.Streams; +import brooklyn.util.text.ComparableVersion; +import brooklyn.util.text.Identifiers; +import brooklyn.util.text.Strings; +import brooklyn.util.time.CountdownTimer; +import brooklyn.util.time.Duration; + +import com.google.common.collect.ImmutableMap; + +/** + * The SSH implementation of the {@link MySqlDriver}. + */ +public class MySqlSshDriver extends AbstractSoftwareProcessSshDriver implements MySqlDriver { + + public static final Logger log = LoggerFactory.getLogger(MySqlSshDriver.class); + + public MySqlSshDriver(MySqlNodeImpl entity, SshMachineLocation machine) { + super(entity, machine); + + entity.setAttribute(Attributes.LOG_FILE_LOCATION, getLogFile()); + } + + public String getOsTag() { + // e.g. "osx10.6-x86_64"; see http://www.mysql.com/downloads/mysql/#downloads + OsDetails os = getLocation().getOsDetails(); + if (os == null) return "linux-glibc2.5-x86_64"; + if (os.isMac()) { + String osp1 = os.getVersion()==null ? "osx10.8" //lowest common denominator + : new ComparableVersion(os.getVersion()).isGreaterThanOrEqualTo(OsVersions.MAC_10_9) ? "osx10.9" + : "osx10.8"; //lowest common denominator + if (!os.is64bit()) { + throw new IllegalStateException("Only 64 bit MySQL build is available for OS X"); + } + return osp1+"-x86_64"; + } + //assume generic linux + String osp1 = "linux-glibc2.5"; + String osp2 = os.is64bit() ? "x86_64" : "i686"; + return osp1+"-"+osp2; + } + + public String getBaseDir() { return getExpandedInstallDir(); } + + public String getDataDir() { + String result = entity.getConfig(MySqlNode.DATA_DIR); + return (result == null) ? "." : result; + } + + public String getLogFile() { + return Urls.mergePaths(getRunDir(), "console.log"); + } + + public String getConfigFile() { + return "mymysql.cnf"; + } + + public String getInstallFilename() { + return String.format("mysql-%s-%s.tar.gz", getVersion(), getOsTag()); + } + + @Override + public void preInstall() { + resolver = Entities.newDownloader(this, ImmutableMap.of("filename", getInstallFilename())); + setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("mysql-%s-%s", getVersion(), getOsTag())))); + } + + @Override + public void install() { + List<String> urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + + List<String> commands = new LinkedList<String>(); + commands.add(BashCommands.INSTALL_TAR); + commands.add(BashCommands.INSTALL_CURL); + + commands.add("echo installing extra packages"); + commands.add(installPackage(ImmutableMap.of("yum", "libgcc_s.so.1"), null)); + commands.add(installPackage(ImmutableMap.of("yum", "libaio.so.1 libncurses.so.5", "apt", "libaio1 libaio-dev"), null)); + + // these deps are only needed on some OS versions but others don't need them + commands.add(installPackage(ImmutableMap.of("yum", "libaio", "apt", "ia32-libs"), null)); + commands.add("echo finished installing extra packages"); + commands.addAll(commandsToDownloadUrlsAs(urls, saveAs)); + commands.add(format("tar xfvz %s", saveAs)); + + newScript(INSTALLING).body.append(commands).execute(); + } + + @Override + public MySqlNodeImpl getEntity() { return (MySqlNodeImpl) super.getEntity(); } + public int getPort() { return getEntity().getPort(); } + public String getSocketUid() { return getEntity().getSocketUid(); } + public String getPassword() { return getEntity().getPassword(); } + + @Override + public void customize() { + copyDatabaseConfigScript(); + + newScript(CUSTOMIZING) + .updateTaskAndFailOnNonZeroResultCode() + .body.append( + "chmod 600 "+getConfigFile(), + getBaseDir()+"/scripts/mysql_install_db "+ + "--basedir="+getBaseDir()+" --datadir="+getDataDir()+" "+ + "--defaults-file="+getConfigFile()) + .execute(); + + // launch, then we will configure it + launch(); + + CountdownTimer timer = Duration.seconds(20).countdownTimer(); + boolean hasCreationScript = copyDatabaseCreationScript(); + timer.waitForExpiryUnchecked(); + + DynamicTasks.queue( + SshEffectorTasks.ssh( + "cd "+getRunDir(), + getBaseDir()+"/bin/mysqladmin --defaults-file="+getConfigFile()+" --password= password "+getPassword() + ).summary("setting password")); + + if (hasCreationScript) + executeScriptFromInstalledFileAsync("creation-script.sql").asTask().getUnchecked(); + + // not sure necessary to stop then subsequently launch, but seems safest + // (if skipping, use a flag in launch to indicate we've just launched it) + stop(); + } + + protected void copyDatabaseConfigScript() { + newScript(CUSTOMIZING).execute(); //create the directory + + String configScriptContents = processTemplate(entity.getAttribute(MySqlNode.TEMPLATE_CONFIGURATION_URL)); + Reader configContents = new StringReader(configScriptContents); + + getMachine().copyTo(configContents, Urls.mergePaths(getRunDir(), getConfigFile())); + } + + protected boolean copyDatabaseCreationScript() { + String creationScriptContents = DatastoreMixins.getDatabaseCreationScriptAsString(entity); + if (creationScriptContents==null) return false; + + File templateFile = null; + BufferedWriter writer = null; + try { + templateFile = File.createTempFile("mysql", null); + FileUtil.setFilePermissionsTo600(templateFile); + writer = new BufferedWriter(new FileWriter(templateFile)); + writer.write(creationScriptContents); + writer.flush(); + copyTemplate(templateFile.getAbsoluteFile(), getRunDir() + "/creation-script.sql"); + } catch (IOException e) { + throw Exceptions.propagate(e); + } finally { + if (writer != null) Streams.closeQuietly(writer); + if (templateFile != null) templateFile.delete(); + } + return true; + } + + public String getMySqlServerOptionsString() { + Map<String, Object> options = entity.getConfig(MySqlNode.MYSQL_SERVER_CONF); + StringBuilder result = new StringBuilder(); + if (groovyTruth(options)) { + for (Map.Entry<String, Object> entry : options.entrySet()) { + result.append(entry.getKey()); + String value = entry.getValue().toString(); + if (!Strings.isEmpty(value)) { + result.append(" = ").append(value); + } + result.append('\n'); + } + } + return result.toString(); + } + + @Override + public void launch() { + entity.setAttribute(MySqlNode.PID_FILE, getRunDir() + "/" + AbstractSoftwareProcessSshDriver.PID_FILENAME); + newScript(MutableMap.of("usePidFile", true), LAUNCHING) + .updateTaskAndFailOnNonZeroResultCode() + .body.append(format("nohup %s/bin/mysqld --defaults-file=%s --user=`whoami` > %s 2>&1 < /dev/null &", getBaseDir(), getConfigFile(), getLogFile())) + .execute(); + } + + @Override + public boolean isRunning() { + return newScript(MutableMap.of("usePidFile", false), CHECK_RUNNING) + .body.append(getStatusCmd()) + .execute() == 0; + } + + @Override + public void stop() { + newScript(MutableMap.of("usePidFile", true), STOPPING).execute(); + } + + @Override + public void kill() { + newScript(MutableMap.of("usePidFile", true), KILLING).execute(); + } + + @Override + public String getStatusCmd() { + return format("%s/bin/mysqladmin --defaults-file=%s status", getBaseDir(), Urls.mergePaths(getRunDir(), getConfigFile())); + } + + @Override + public ProcessTaskWrapper<Integer> executeScriptAsync(String commands) { + String filename = "mysql-commands-"+Identifiers.makeRandomId(8); + DynamicTasks.queue(SshEffectorTasks.put(Urls.mergePaths(getRunDir(), filename)).contents(commands).summary("copying datastore script to execute "+filename)); + return executeScriptFromInstalledFileAsync(filename); + } + + public ProcessTaskWrapper<Integer> executeScriptFromInstalledFileAsync(String filenameAlreadyInstalledAtServer) { + return DynamicTasks.queue( + SshEffectorTasks.ssh( + "cd "+getRunDir(), + getBaseDir()+"/bin/mysql --defaults-file="+getConfigFile()+" < "+filenameAlreadyInstalledAtServer) + .requiringExitCodeZero() + .summary("executing datastore script "+filenameAlreadyInstalledAtServer)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1a7c09/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlDriver.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlDriver.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlDriver.java new file mode 100644 index 0000000..c1df992 --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlDriver.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.postgresql; + +import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper; + +import brooklyn.entity.basic.SoftwareProcessDriver; + +/** + * The {@link brooklyn.entity.basic.SoftwareProcessDriver} for PostgreSQL. + */ +public interface PostgreSqlDriver extends SoftwareProcessDriver { + + String getStatusCmd(); + + ProcessTaskWrapper<Integer> executeScriptAsync(String commands); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1a7c09/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlNode.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlNode.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlNode.java new file mode 100644 index 0000000..7d195f7 --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlNode.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.postgresql; + +import org.apache.brooklyn.api.catalog.Catalog; +import org.apache.brooklyn.api.entity.Effector; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.api.entity.trait.HasShortName; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SoftwareProcess; +import org.apache.brooklyn.entity.database.DatabaseNode; +import org.apache.brooklyn.entity.database.DatastoreMixins; +import org.apache.brooklyn.entity.database.DatastoreMixins.DatastoreCommon; +import brooklyn.entity.effector.Effectors; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; + +import org.apache.brooklyn.location.basic.PortRanges; + +/** + * PostgreSQL database node entity. + * <p> + * <ul> + * <li>You may need to increase shared memory settings in the kernel depending on the setting of + * the {@link #SHARED_MEMORY_BUFFER} key. The minimumm value is <em>128kB</em>. See the PostgreSQL + * <a href="http://www.postgresql.org/docs/9.1/static/kernel-resources.html">documentation</a>. + * <li>You will also need to enable passwordless sudo. + * </ul> + */ +@Catalog(name="PostgreSQL Node", description="PostgreSQL is an object-relational database management system (ORDBMS)", iconUrl="classpath:///postgresql-logo-200px.png") +@ImplementedBy(PostgreSqlNodeImpl.class) +public interface PostgreSqlNode extends SoftwareProcess, HasShortName, DatastoreCommon, DatabaseNode { + + @SetFromFlag("version") + ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "9.3-1");//"9.1-4"); + + @SetFromFlag("configFileUrl") + ConfigKey<String> CONFIGURATION_FILE_URL = ConfigKeys.newStringConfigKey( + "postgresql.config.file.url", "URL where PostgreSQL configuration file can be found; " + + "if not supplied the blueprint uses the default and customises it"); + + @SetFromFlag("authConfigFileUrl") + ConfigKey<String> AUTHENTICATION_CONFIGURATION_FILE_URL = ConfigKeys.newStringConfigKey( + "postgresql.authConfig.file.url", "URL where PostgreSQL host-based authentication configuration file can be found; " + + "if not supplied the blueprint uses the default and customises it"); + + @SetFromFlag("port") + PortAttributeSensorAndConfigKey POSTGRESQL_PORT = new PortAttributeSensorAndConfigKey( + "postgresql.port", "PostgreSQL port", PortRanges.fromString("5432+")); + + @SetFromFlag("sharedMemory") + ConfigKey<String> SHARED_MEMORY = ConfigKeys.newStringConfigKey( + "postgresql.sharedMemory", "Size of shared memory buffer (must specify as kB, MB or GB, minimum 128kB)", "4MB"); + + @SetFromFlag("maxConnections") + ConfigKey<Integer> MAX_CONNECTIONS = ConfigKeys.newIntegerConfigKey( + "postgresql.maxConnections", "Maximum number of connections to the database", 100); + + @SetFromFlag("disconnectOnStop") + ConfigKey<Boolean> DISCONNECT_ON_STOP = ConfigKeys.newBooleanConfigKey( + "postgresql.disconnect.on.stop", "If true, PostgreSQL will immediately disconnet (pg_ctl -m immediate stop) all current connections when the node is stopped", true); + + @SetFromFlag("pollPeriod") + ConfigKey<Long> POLL_PERIOD = ConfigKeys.newLongConfigKey( + "postgresql.sensorpoll", "Poll period (in milliseconds)", 1000L); + + Effector<String> EXECUTE_SCRIPT = Effectors.effector(DatastoreMixins.EXECUTE_SCRIPT) + .description("Executes the given script contents using psql") + .buildAbstract(); + + Integer getPostgreSqlPort(); + String getSharedMemory(); + Integer getMaxConnections(); + + String executeScript(String commands); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1a7c09/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlNodeChefImplFromScratch.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlNodeChefImplFromScratch.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlNodeChefImplFromScratch.java new file mode 100644 index 0000000..e99714a --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlNodeChefImplFromScratch.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.postgresql; + +import org.apache.brooklyn.api.entity.Effector; +import org.apache.brooklyn.core.util.ResourceUtils; +import org.apache.brooklyn.core.util.config.ConfigBag; +import org.apache.brooklyn.core.util.task.DynamicTasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.EffectorStartableImpl; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.chef.ChefConfig; +import brooklyn.entity.chef.ChefLifecycleEffectorTasks; +import brooklyn.entity.chef.ChefServerTasks; +import brooklyn.entity.effector.EffectorBody; +import brooklyn.entity.effector.Effectors; +import brooklyn.entity.software.SshEffectorTasks; +import brooklyn.event.feed.ssh.SshFeed; +import brooklyn.event.feed.ssh.SshPollConfig; + +import org.apache.brooklyn.location.basic.Locations; +import org.apache.brooklyn.location.basic.SshMachineLocation; + +import brooklyn.util.collections.Jsonya; +import brooklyn.util.guava.Maybe; +import brooklyn.util.ssh.BashCommands; + +public class PostgreSqlNodeChefImplFromScratch extends EffectorStartableImpl implements PostgreSqlNode { + + private static final Logger LOG = LoggerFactory.getLogger(PostgreSqlNodeChefImplFromScratch.class); + + public static final Effector<String> EXECUTE_SCRIPT = Effectors.effector(String.class, "executeScript") + .description("invokes a script") + .parameter(ExecuteScriptEffectorBody.SCRIPT) + .impl(new ExecuteScriptEffectorBody()).build(); + + private SshFeed feed; + + public void init() { + super.init(); + new ChefPostgreSqlLifecycle().attachLifecycleEffectors(this); + } + + @Override + public Integer getPostgreSqlPort() { return getAttribute(POSTGRESQL_PORT); } + + @Override + public String getSharedMemory() { return getConfig(SHARED_MEMORY); } + + @Override + public Integer getMaxConnections() { return getConfig(MAX_CONNECTIONS); } + + @Override + public String getShortName() { + return "PostgreSQL"; + } + + public static class ChefPostgreSqlLifecycle extends ChefLifecycleEffectorTasks { + { + usePidFile("/var/run/postgresql/*.pid"); + useService("postgresql"); + } + protected void startWithKnifeAsync() { + Entities.warnOnIgnoringConfig(entity(), ChefConfig.CHEF_LAUNCH_RUN_LIST); + Entities.warnOnIgnoringConfig(entity(), ChefConfig.CHEF_LAUNCH_ATTRIBUTES); + + DynamicTasks.queue( + ChefServerTasks + .knifeConvergeRunList("postgresql::server") + .knifeAddAttributes(Jsonya + .at("postgresql", "config").add( + "port", entity().getPostgreSqlPort(), + "listen_addresses", "*").getRootMap()) + .knifeAddAttributes(Jsonya + .at("postgresql", "pg_hba").list().map().add( + "type", "host", "db", "all", "user", "all", + "addr", "0.0.0.0/0", "method", "md5").getRootMap()) + // no other arguments currenty supported; chef will pick a password for us + ); + } + protected void postStartCustom() { + super.postStartCustom(); + + // now run the creation script + String creationScript; + String creationScriptUrl = entity().getConfig(PostgreSqlNode.CREATION_SCRIPT_URL); + if (creationScriptUrl != null) { + creationScript = ResourceUtils.create(entity()).getResourceAsString(creationScriptUrl); + } else { + creationScript = entity().getConfig(PostgreSqlNode.CREATION_SCRIPT_CONTENTS); + } + entity().executeScript(creationScript); + + // and finally connect sensors + entity().connectSensors(); + } + protected void preStopCustom() { + entity().disconnectSensors(); + super.preStopCustom(); + } + protected PostgreSqlNodeChefImplFromScratch entity() { + return (PostgreSqlNodeChefImplFromScratch) super.entity(); + } + } + + public static class ExecuteScriptEffectorBody extends EffectorBody<String> { + public static final ConfigKey<String> SCRIPT = ConfigKeys.newStringConfigKey("script", "contents of script to run"); + + public String call(ConfigBag parameters) { + return DynamicTasks.queue(SshEffectorTasks.ssh( + BashCommands.pipeTextTo( + parameters.get(SCRIPT), + BashCommands.sudoAsUser("postgres", "psql --file -"))) + .requiringExitCodeZero()).getStdout(); + } + } + + protected void connectSensors() { + setAttribute(DATASTORE_URL, String.format("postgresql://%s:%s/", getAttribute(HOSTNAME), getAttribute(POSTGRESQL_PORT))); + + Maybe<SshMachineLocation> machine = Locations.findUniqueSshMachineLocation(getLocations()); + + if (machine.isPresent()) { + feed = SshFeed.builder() + .entity(this) + .machine(machine.get()) + .poll(new SshPollConfig<Boolean>(SERVICE_UP) + .command("ps -ef | grep [p]ostgres") + .setOnSuccess(true) + .setOnFailureOrException(false)) + .build(); + } else { + LOG.warn("Location(s) {} not an ssh-machine location, so not polling for status; setting serviceUp immediately", getLocations()); + } + } + + protected void disconnectSensors() { + if (feed != null) feed.stop(); + } + + @Override + public String executeScript(String commands) { + return Entities.invokeEffector(this, this, EXECUTE_SCRIPT, + ConfigBag.newInstance().configure(ExecuteScriptEffectorBody.SCRIPT, commands).getAllConfig()).getUnchecked(); + } + + @Override + public void populateServiceNotUpDiagnostics() { + // TODO no-op currently; should check ssh'able etc + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1a7c09/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlNodeImpl.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlNodeImpl.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlNodeImpl.java new file mode 100644 index 0000000..5cc9fd8 --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlNodeImpl.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.postgresql; + +import org.apache.brooklyn.core.util.config.ConfigBag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.entity.effector.EffectorBody; + +public class PostgreSqlNodeImpl extends SoftwareProcessImpl implements PostgreSqlNode { + + private static final Logger LOG = LoggerFactory.getLogger(PostgreSqlNodeImpl.class); + + public Class<?> getDriverInterface() { + return PostgreSqlDriver.class; + } + @Override + public PostgreSqlDriver getDriver() { + return (PostgreSqlDriver) super.getDriver(); + } + + @Override + public Integer getPostgreSqlPort() { return getAttribute(POSTGRESQL_PORT); } + + @Override + public String getSharedMemory() { return getConfig(SHARED_MEMORY); } + + @Override + public Integer getMaxConnections() { return getConfig(MAX_CONNECTIONS); } + + @Override + public void init() { + super.init(); + getMutableEntityType().addEffector(EXECUTE_SCRIPT, new EffectorBody<String>() { + @Override + public String call(ConfigBag parameters) { + return executeScript((String) parameters.getStringKey("commands")); + } + }); + } + + @Override + protected void connectSensors() { + super.connectSensors(); + connectServiceUpIsRunning(); + setAttribute(DATASTORE_URL, String.format("postgresql://%s:%s/", getAttribute(HOSTNAME), getAttribute(POSTGRESQL_PORT))); + } + + @Override + protected void disconnectSensors() { + disconnectServiceUpIsRunning(); + super.disconnectSensors(); + } + + @Override + public String getShortName() { + return "PostgreSQL"; + } + + @Override + public String executeScript(String commands) { + return getDriver() + .executeScriptAsync(commands) + .block() + .getStdout(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ac1a7c09/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlSpecs.java ---------------------------------------------------------------------- diff --git a/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlSpecs.java b/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlSpecs.java new file mode 100644 index 0000000..5e24275 --- /dev/null +++ b/software/database/src/main/java/org/apache/brooklyn/entity/database/postgresql/PostgreSqlSpecs.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.database.postgresql; + +import org.apache.brooklyn.api.entity.proxying.EntitySpec; + +import brooklyn.entity.chef.ChefConfig; +import brooklyn.entity.chef.ChefConfig.ChefModes; + +/** + * Utiltiy for creating specs for {@link PostgreSqlNode} instances. + */ +public class PostgreSqlSpecs { + + private PostgreSqlSpecs() {} + + public static EntitySpec<PostgreSqlNode> spec() { + return EntitySpec.create(PostgreSqlNode.class); + } + + /** Requires {@code knife}. */ + public static EntitySpec<PostgreSqlNode> specChef() { + EntitySpec<PostgreSqlNode> spec = EntitySpec.create(PostgreSqlNode.class, PostgreSqlNodeChefImplFromScratch.class); + spec.configure(ChefConfig.CHEF_MODE, ChefModes.KNIFE); + return spec; + } +}
