Move common driver code to shared parent class
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/df84b662 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/df84b662 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/df84b662 Branch: refs/heads/0.5.0 Commit: df84b662b5d8dfb757f713ae997065d4a8f7882d Parents: a24e0e4 Author: Andrew Kennedy <[email protected]> Authored: Thu Mar 21 02:22:57 2013 +0000 Committer: Andrew Kennedy <[email protected]> Committed: Fri Apr 19 10:36:07 2013 +0100 ---------------------------------------------------------------------- .../kafka/AbstractfKafkaSshDriver.java | 170 +++++++++++++++++++ .../messaging/kafka/KafkaBrokerSshDriver.java | 129 ++------------ .../kafka/KafkaZookeeperSshDriver.java | 129 ++------------ 3 files changed, 198 insertions(+), 230 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/df84b662/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java new file mode 100644 index 0000000..f6c7c8d --- /dev/null +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java @@ -0,0 +1,170 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import static java.lang.String.format; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.BrooklynVersion; +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.EntityLocal; +import brooklyn.entity.basic.lifecycle.CommonCommands; +import brooklyn.entity.drivers.downloads.DownloadResolver; +import brooklyn.entity.java.JavaSoftwareProcessSshDriver; +import brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.MutableMap; +import brooklyn.util.NetworkUtils; +import brooklyn.util.ResourceUtils; +import brooklyn.util.jmx.jmxrmi.JmxRmiAgent; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public abstract class AbstractfKafkaSshDriver extends JavaSoftwareProcessSshDriver { + + private static final Logger log = LoggerFactory.getLogger(KafkaZookeeperSshDriver.class); + + public AbstractfKafkaSshDriver(EntityLocal entity, SshMachineLocation machine) { + super(entity, machine); + } + + protected abstract Map<String, Integer> getPortMap(); + + protected abstract ConfigKey<String> getConfigTemplateKey(); + + protected abstract String getConfigFileName(); + + protected abstract String getLaunchScriptName(); + + protected abstract String getProcessIdentifier(); + + private String expandedInstallDir; + + @Override + protected String getLogFileLocation() { return getRunDir()+"/console.out"; } + + private String getExpandedInstallDir() { + if (expandedInstallDir == null) throw new IllegalStateException("expandedInstallDir is null; most likely install was not called"); + return expandedInstallDir; + } + + @Override + public void install() { + DownloadResolver resolver = entity.getManagementContext().getEntityDownloadsManager().newDownloader(this); + List<String> urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + expandedInstallDir = getInstallDir()+"/"+resolver.getUnpackedDirectoryName(format("kafka-%s-src", getVersion())); + + List<String> commands = new LinkedList<String>(); + commands.addAll(CommonCommands.downloadUrlAs(urls, saveAs)); + commands.add(CommonCommands.INSTALL_TAR); + commands.add("tar xzfv "+saveAs); + commands.add("cd "+expandedInstallDir); + commands.add("./sbt update"); + commands.add("./sbt package"); + + newScript(INSTALLING) + .failOnNonZeroResultCode() + .body.append(commands) + .execute(); + } + + @Override + public void customize() { + NetworkUtils.checkPortsValid(getPortMap()); + newScript(CUSTOMIZING) + .failOnNonZeroResultCode() + .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), getRunDir())) + .execute(); + + String config = entity.getConfig(getConfigTemplateKey()); + copyTemplate(config, getConfigFileName()); + + // Copy JMX agent Jar to server + getMachine().copyTo(new ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), getJmxRmiAgentJarDestinationFilePath()); + } + + public String getJmxRmiAgentJarBasename() { + return "brooklyn-jmxrmi-agent-" + BrooklynVersion.get() + ".jar"; + } + + public String getJmxRmiAgentJarUrl() { + return "classpath://" + getJmxRmiAgentJarBasename(); + } + + public String getJmxRmiAgentJarDestinationFilePath() { + return getRunDir() + "/" + getJmxRmiAgentJarBasename(); + } + + @Override + public void launch() { + newScript(ImmutableMap.of("usePidFile", getPidFile()), LAUNCHING) + .failOnNonZeroResultCode() + .body.append(String.format("nohup ./bin/%s ./%s > console.out 2>&1 &", getLaunchScriptName(), getConfigFileName())) + .execute(); + } + + public String getPidFile() { return getRunDir() + "/kafka.pid"; } + + @Override + public boolean isRunning() { + return newScript(ImmutableMap.of("usePidFile", getPidFile()), CHECK_RUNNING).execute() == 0; + } + + @Override + public void stop() { + newScript(ImmutableMap.of("usePidFile", false), STOPPING) + .body.append(String.format("ps ax | grep %s | awk '{print $1}' | xargs kill", getProcessIdentifier())) + .body.append(String.format("ps ax | grep %s | awk '{print $1}' | xargs kill -9", getProcessIdentifier())) + .execute(); + } + + @Override + protected Map<String, ?> getJmxJavaSystemProperties() { + return MutableMap.<String, Object> builder() + .put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort()) + .put(JmxRmiAgent.RMI_REGISTRY_PORT_PROPERTY, getRmiServerPort()) + .put("com.sun.management.jmxremote.ssl", false) + .put("com.sun.management.jmxremote.authenticate", false) + .put("java.rmi.server.hostname", getHostname()) + .build(); + } + + @Override + protected List<String> getJmxJavaConfigOptions() { + return ImmutableList.of("-javaagent:" + getJmxRmiAgentJarDestinationFilePath()); + } + + /** + * Use RMI agent to provide JMX. + */ + @Override + public Map<String, String> getShellEnvironment() { + Map<String, String> orig = super.getShellEnvironment(); + String kafkaJmxOpts = orig.remove("JAVA_OPTS"); + return MutableMap.<String, String>builder() + .putAll(orig) + .put("KAFKA_JMX_OPTS", kafkaJmxOpts) + .build(); + } + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/df84b662/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java index b3cd0f0..40e7234 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java @@ -15,147 +15,46 @@ */ package brooklyn.entity.messaging.kafka; -import static java.lang.String.format; - -import java.util.LinkedList; -import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.BrooklynVersion; -import brooklyn.entity.basic.lifecycle.CommonCommands; -import brooklyn.entity.drivers.downloads.DownloadResolver; -import brooklyn.entity.java.JavaSoftwareProcessSshDriver; +import brooklyn.config.ConfigKey; import brooklyn.location.basic.SshMachineLocation; import brooklyn.util.MutableMap; -import brooklyn.util.NetworkUtils; -import brooklyn.util.ResourceUtils; -import brooklyn.util.jmx.jmxrmi.JmxRmiAgent; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -public class KafkaBrokerSshDriver extends JavaSoftwareProcessSshDriver implements KafkaBrokerDriver { - private static final Logger log = LoggerFactory.getLogger(KafkaBrokerSshDriver.class); - - private String expandedInstallDir; +public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements KafkaBrokerDriver { public KafkaBrokerSshDriver(KafkaBrokerImpl entity, SshMachineLocation machine) { super(entity, machine); } @Override - protected String getLogFileLocation() { return getRunDir()+"/console.out"; } - - @Override - public Integer getKafkaPort() { return entity.getAttribute(KafkaBroker.KAFKA_PORT); } - - private String getExpandedInstallDir() { - if (expandedInstallDir == null) throw new IllegalStateException("expandedInstallDir is null; most likely install was not called"); - return expandedInstallDir; - } - - @Override - public void install() { - DownloadResolver resolver = entity.getManagementContext().getEntityDownloadsManager().newDownloader(this); - List<String> urls = resolver.getTargets(); - String saveAs = resolver.getFilename(); - expandedInstallDir = getInstallDir()+"/"+resolver.getUnpackedDirectoryName(format("kafka-%s-src", getVersion())); - - List<String> commands = new LinkedList<String>(); - commands.addAll(CommonCommands.downloadUrlAs(urls, saveAs)); - commands.add(CommonCommands.INSTALL_TAR); - commands.add("tar xzfv "+saveAs); - commands.add("cd "+expandedInstallDir); - commands.add("./sbt update"); - commands.add("./sbt package"); - - newScript(INSTALLING) - .failOnNonZeroResultCode() - .body.append(commands) - .execute(); - } - - @Override - public void customize() { - NetworkUtils.checkPortsValid(MutableMap.of("kafkaPort", getKafkaPort())); - newScript(CUSTOMIZING) - .failOnNonZeroResultCode() - .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), getRunDir())) - .execute(); - - String serverConfig = entity.getConfig(KafkaBroker.SERVER_CONFIG_TEMPLATE); - copyTemplate(serverConfig, "server.properties"); - - // Copy JMX agent Jar to server - getMachine().copyTo(new ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), getJmxRmiAgentJarDestinationFilePath()); - } - - public String getJmxRmiAgentJarBasename() { - return "brooklyn-jmxrmi-agent-" + BrooklynVersion.get() + ".jar"; - } - - public String getJmxRmiAgentJarUrl() { - return "classpath://" + getJmxRmiAgentJarBasename(); - } - - public String getJmxRmiAgentJarDestinationFilePath() { - return getRunDir() + "/" + getJmxRmiAgentJarBasename(); - } - - @Override - public void launch() { - newScript(ImmutableMap.of("usePidFile", getPidFile()), LAUNCHING) - .failOnNonZeroResultCode() - .body.append("nohup ./bin/kafka-server-start.sh ./server.properties > console.out 2>&1 &") - .execute(); + protected Map<String, Integer> getPortMap() { + return MutableMap.of("kafkaPort", getKafkaPort()); } - public String getPidFile() { return getRunDir() + "/kafka.pid"; } - @Override - public boolean isRunning() { - return newScript(ImmutableMap.of("usePidFile", getPidFile()), CHECK_RUNNING).execute() == 0; + protected ConfigKey<String> getConfigTemplateKey() { + return KafkaBroker.SERVER_CONFIG_TEMPLATE; } @Override - public void stop() { - newScript(ImmutableMap.of("usePidFile", false), STOPPING) - .body.append("ps ax | grep kafka\\.Kafka | awk '{print $1}' | xargs kill") - .body.append("ps ax | grep kafka\\.Kafka | awk '{print $1}' | xargs kill -9") - .execute(); + protected String getConfigFileName() { + return "server.properties"; } @Override - protected Map<String, ?> getJmxJavaSystemProperties() { - return MutableMap.<String, Object> builder() - .put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort()) - .put(JmxRmiAgent.RMI_REGISTRY_PORT_PROPERTY, getRmiServerPort()) - .put("com.sun.management.jmxremote.ssl", false) - .put("com.sun.management.jmxremote.authenticate", false) - .put("java.rmi.server.hostname", getHostname()) - .build(); + protected String getLaunchScriptName() { + return "kafka-server-start.sh"; } @Override - protected List<String> getJmxJavaConfigOptions() { - return ImmutableList.of("-javaagent:" + getJmxRmiAgentJarDestinationFilePath()); + protected String getProcessIdentifier() { + return "kafka\\.Kafka"; } - /** - * Use RMI agent to provide JMX. - */ @Override - public Map<String, String> getShellEnvironment() { - Map<String, String> orig = super.getShellEnvironment(); - String kafkaJmxOpts = orig.remove("JAVA_OPTS"); - return MutableMap.<String, String>builder() - .putAll(orig) - .put("KAFKA_JMX_OPTS", kafkaJmxOpts) - .build(); + public Integer getKafkaPort() { + return getEntity().getAttribute(KafkaBroker.KAFKA_PORT); } } http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/df84b662/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java index c62cb0a..a35aab6 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java @@ -15,147 +15,46 @@ */ package brooklyn.entity.messaging.kafka; -import static java.lang.String.format; - -import java.util.LinkedList; -import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.BrooklynVersion; -import brooklyn.entity.basic.lifecycle.CommonCommands; -import brooklyn.entity.drivers.downloads.DownloadResolver; -import brooklyn.entity.java.JavaSoftwareProcessSshDriver; +import brooklyn.config.ConfigKey; import brooklyn.location.basic.SshMachineLocation; import brooklyn.util.MutableMap; -import brooklyn.util.NetworkUtils; -import brooklyn.util.ResourceUtils; -import brooklyn.util.jmx.jmxrmi.JmxRmiAgent; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; - -public class KafkaZookeeperSshDriver extends JavaSoftwareProcessSshDriver implements KafkaZookeeperDriver { - private static final Logger log = LoggerFactory.getLogger(KafkaZookeeperSshDriver.class); - - private String expandedInstallDir; +public class KafkaZookeeperSshDriver extends AbstractfKafkaSshDriver implements KafkaZookeeperDriver { public KafkaZookeeperSshDriver(KafkaZookeeperImpl entity, SshMachineLocation machine) { super(entity, machine); } @Override - protected String getLogFileLocation() { return getRunDir()+"/console.out"; } - - @Override - public Integer getZookeeperPort() { return entity.getAttribute(KafkaZookeeper.ZOOKEEPER_PORT); } - - private String getExpandedInstallDir() { - if (expandedInstallDir == null) throw new IllegalStateException("expandedInstallDir is null; most likely install was not called"); - return expandedInstallDir; - } - - @Override - public void install() { - DownloadResolver resolver = entity.getManagementContext().getEntityDownloadsManager().newDownloader(this); - List<String> urls = resolver.getTargets(); - String saveAs = resolver.getFilename(); - expandedInstallDir = getInstallDir()+"/"+resolver.getUnpackedDirectoryName(format("kafka-%s-src", getVersion())); - - List<String> commands = new LinkedList<String>(); - commands.addAll(CommonCommands.downloadUrlAs(urls, saveAs)); - commands.add(CommonCommands.INSTALL_TAR); - commands.add("tar xzfv "+saveAs); - commands.add("cd "+expandedInstallDir); - commands.add("./sbt update"); - commands.add("./sbt package"); - - newScript(INSTALLING) - .failOnNonZeroResultCode() - .body.append(commands) - .execute(); - } - - @Override - public void customize() { - NetworkUtils.checkPortsValid(MutableMap.of("zookeeperPort", getZookeeperPort())); - newScript(CUSTOMIZING) - .failOnNonZeroResultCode() - .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), getRunDir())) - .execute(); - - String zookeeperConfig = entity.getConfig(KafkaZookeeper.ZOOKEEPER_CONFIG_TEMPLATE); - copyTemplate(zookeeperConfig, "zookeeper.properties"); - - // Copy JMX agent Jar to server - getMachine().copyTo(new ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), getJmxRmiAgentJarDestinationFilePath()); - } - - public String getJmxRmiAgentJarBasename() { - return "brooklyn-jmxrmi-agent-" + BrooklynVersion.get() + ".jar"; - } - - public String getJmxRmiAgentJarUrl() { - return "classpath://" + getJmxRmiAgentJarBasename(); - } - - public String getJmxRmiAgentJarDestinationFilePath() { - return getRunDir() + "/" + getJmxRmiAgentJarBasename(); - } - - @Override - public void launch() { - newScript(ImmutableMap.of("usePidFile", getPidFile()), LAUNCHING) - .failOnNonZeroResultCode() - .body.append("nohup ./bin/zookeeper-server-start.sh ./zookeeper.properties > console.out 2>&1 &") - .execute(); + protected Map<String, Integer> getPortMap() { + return MutableMap.of("zookeeperPort", getZookeeperPort()); } - public String getPidFile() { return getRunDir() + "/kafka.pid"; } - @Override - public boolean isRunning() { - return newScript(ImmutableMap.of("usePidFile", getPidFile()), CHECK_RUNNING).execute() == 0; + protected ConfigKey<String> getConfigTemplateKey() { + return KafkaZookeeper.ZOOKEEPER_CONFIG_TEMPLATE; } @Override - public void stop() { - newScript(ImmutableMap.of("usePidFile", false), STOPPING) - .body.append("ps ax | grep quorum\\.QuorumPeerMain | awk '{print $1}' | xargs kill") - .body.append("ps ax | grep quorum\\.QuorumPeerMain | awk '{print $1}' | xargs kill -9") - .execute(); + protected String getConfigFileName() { + return "zookeeper.properties"; } @Override - protected Map<String, ?> getJmxJavaSystemProperties() { - return MutableMap.<String, Object> builder() - .put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort()) - .put(JmxRmiAgent.RMI_REGISTRY_PORT_PROPERTY, getRmiServerPort()) - .put("com.sun.management.jmxremote.ssl", false) - .put("com.sun.management.jmxremote.authenticate", false) - .put("java.rmi.server.hostname", getHostname()) - .build(); + protected String getLaunchScriptName() { + return "zookeeper-server-start.sh"; } @Override - protected List<String> getJmxJavaConfigOptions() { - return ImmutableList.of("-javaagent:" + getJmxRmiAgentJarDestinationFilePath()); + protected String getProcessIdentifier() { + return "quorum\\.QuorumPeerMain"; } - /** - * Use RMI agent to provide JMX. - */ @Override - public Map<String, String> getShellEnvironment() { - Map<String, String> orig = super.getShellEnvironment(); - String kafkaJmxOpts = orig.remove("JAVA_OPTS"); - return MutableMap.<String, String>builder() - .putAll(orig) - .put("KAFKA_JMX_OPTS", kafkaJmxOpts) - .build(); + public Integer getZookeeperPort() { + return getEntity().getAttribute(KafkaZookeeper.ZOOKEEPER_PORT); } }
