New entity for Apache Kafka messaging
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/37e890c2 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/37e890c2 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/37e890c2 Branch: refs/heads/0.5.0 Commit: 37e890c2b55ffb46ddc1a1425db9fff2e9b73bb5 Parents: 74e4016 Author: Andrew Kennedy <[email protected]> Authored: Tue Mar 19 18:16:01 2013 +0000 Committer: Andrew Kennedy <[email protected]> Committed: Fri Apr 19 10:36:06 2013 +0100 ---------------------------------------------------------------------- .../brooklyn/entity/messaging/kafka/Kafka.java | 36 ++++ .../entity/messaging/kafka/KafkaBroker.java | 67 +++++++ .../messaging/kafka/KafkaBrokerDriver.java | 24 +++ .../entity/messaging/kafka/KafkaBrokerImpl.java | 171 ++++++++++++++++ .../messaging/kafka/KafkaBrokerSshDriver.java | 163 ++++++++++++++++ .../entity/messaging/kafka/KafkaCluster.java | 124 ++++++++++++ .../messaging/kafka/KafkaClusterImpl.java | 194 +++++++++++++++++++ .../entity/messaging/kafka/KafkaTopic.java | 57 ++++++ .../entity/messaging/kafka/KafkaZookeeper.java | 51 +++++ .../messaging/kafka/KafkaZookeeperDriver.java | 24 +++ .../messaging/kafka/KafkaZookeeperImpl.java | 128 ++++++++++++ .../kafka/KafkaZookeeperSshDriver.java | 121 ++++++++++++ .../entity/messaging/kafka/server.properties | 120 ++++++++++++ .../entity/messaging/kafka/zookeeper.properties | 25 +++ .../activemq/ActiveMQIntegrationTest.groovy | 6 +- .../messaging/kafka/KafkaIntegrationTest.groovy | 120 ++++++++++++ .../entity/messaging/kafka/KafkaLiveTest.java | 31 +++ 17 files changed, 1461 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java new file mode 100644 index 0000000..7f26f8e --- /dev/null +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java @@ -0,0 +1,36 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.util.flags.SetFromFlag; + +/** + * Shared Kafka broker and zookeeper properties. + */ +public interface Kafka { + + BasicConfigKey<String> SUGGESTED_VERSION = new BasicConfigKey<String>(SoftwareProcess.SUGGESTED_VERSION, "0.7.2-incubating"); + + @SetFromFlag("downloadUrl") + BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( + Attributes.DOWNLOAD_URL, "http://mirror.catn.com/pub/apache/incubator/kafka/kafka-${version}/kafka-${version}-src.tgz"); + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java new file mode 100644 index 0000000..13b8d0d --- /dev/null +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java @@ -0,0 +1,67 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.java.UsesJmx; +import brooklyn.entity.messaging.MessageBroker; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.util.flags.SetFromFlag; + +/** + * An {@link brooklyn.entity.Entity} that represents a single Kafka broker instance. + */ +@ImplementedBy(KafkaBrokerImpl.class) +public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Kafka { + + @SetFromFlag("version") + BasicConfigKey<String> SUGGESTED_VERSION = Kafka.SUGGESTED_VERSION; + + @SetFromFlag("kafkaPort") + PortAttributeSensorAndConfigKey KAFKA_PORT = new PortAttributeSensorAndConfigKey("kafka.port", "Kafka port", "9092+"); + + /** Location of the configuration file template to be copied to the server.*/ + @SetFromFlag("serverConfig") + BasicConfigKey<String> SERVER_CONFIG_TEMPLATE = new BasicConfigKey<String>( + String.class, "kafka.config.server", "Server configuration template (in freemarker format)", "classpath://brooklyn/entity/messaging/kafka/server.properties"); + + @SetFromFlag("zookeeper") + BasicConfigKey<KafkaZookeeper> ZOOKEEPER = new BasicConfigKey<KafkaZookeeper>(KafkaZookeeper.class, "Kafka zookeeper entity"); + + AttributeSensor<Long> BROKER_ID = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.id", "Kafka unique broker ID"); + + BasicAttributeSensor<Long> FETCH_REQUEST_COUNT = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.fetch.total", "Fetch request count"); + BasicAttributeSensor<Long> TOTAL_FETCH_TIME = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.fetch.time.total", "Total fetch request processing time (millis)"); + BasicAttributeSensor<Double> MAX_FETCH_TIME = new BasicAttributeSensor<Double>(Double.class, "kafka.broker.fetch.time.max", "Max fetch request processing time (millis)"); + + BasicAttributeSensor<Long> PRODUCE_REQUEST_COUNT = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.produce.total", "Produce request count"); + BasicAttributeSensor<Long> TOTAL_PRODUCE_TIME = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.produce.time.total", "Total produce request processing time (millis)"); + BasicAttributeSensor<Double> MAX_PRODUCE_TIME = new BasicAttributeSensor<Double>(Double.class, "kafka.broker.produce.time.max", "Max produce request processing time (millis)"); + + BasicAttributeSensor<Long> BYTES_RECEIVED = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.bytes.received", "Total bytes received"); + BasicAttributeSensor<Long> BYTES_SENT = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.bytes.sent", "Total bytes sent"); + + Integer getKafkaPort(); + + Long getBrokerId(); + + KafkaZookeeper getZookeeper(); + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java new file mode 100644 index 0000000..c9caa03 --- /dev/null +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java @@ -0,0 +1,24 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import brooklyn.entity.basic.SoftwareProcessDriver; + +public interface KafkaBrokerDriver extends SoftwareProcessDriver { + + Integer getKafkaPort(); + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java new file mode 100644 index 0000000..d76072e --- /dev/null +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java @@ -0,0 +1,171 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.Entity; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.entity.messaging.MessageBroker; +import brooklyn.event.feed.function.FunctionFeed; +import brooklyn.event.feed.function.FunctionPollConfig; +import brooklyn.event.feed.jmx.JmxAttributePollConfig; +import brooklyn.event.feed.jmx.JmxFeed; +import brooklyn.util.MutableMap; + +import com.google.common.base.Functions; +import com.google.common.base.Objects.ToStringHelper; +import com.google.common.collect.Sets; + +/** + * An {@link brooklyn.entity.Entity} that represents a single Kafka broker instance. + */ +public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroker, KafkaBroker { + private static final Logger log = LoggerFactory.getLogger(KafkaBrokerImpl.class); + + private static final AtomicLong brokers = new AtomicLong(0l); + + public KafkaBrokerImpl() { + super(); + } + public KafkaBrokerImpl(Map<?, ?> properties) { + this(properties, null); + } + public KafkaBrokerImpl(Entity parent) { + this(MutableMap.of(), parent); + } + public KafkaBrokerImpl(Map<?, ?> properties, Entity parent) { + super(properties, parent); + } + + @Override + public void postConstruct() { + setAttribute(BROKER_ID, brokers.incrementAndGet()); + } + + @Override + public Integer getKafkaPort() { return getAttribute(KAFKA_PORT); } + + @Override + public Long getBrokerId() { return getAttribute(BROKER_ID); } + + @Override + public KafkaZookeeper getZookeeper() { return getConfig(ZOOKEEPER); } + + public KafkaTopic createTopic(Map properties) { + KafkaTopic result = new KafkaTopic(properties, this); + Entities.manage(result); + result.create(); + return result; + } + + @Override + public Class getDriverInterface() { + return KafkaBrokerDriver.class; + } + + @Override + protected Collection<Integer> getRequiredOpenPorts() { + Set<Integer> ports = Sets.newLinkedHashSet(super.getRequiredOpenPorts()); + ports.add(getAttribute(KAFKA_PORT)); + log.debug("getRequiredOpenPorts detected expanded ports {} for {}", ports, this); + return ports; + } + + private volatile FunctionFeed functionFeed; + private volatile JmxFeed jmxFeed; + + @Override + protected void connectSensors() { + String socketServerStatsMbean = "kafka:type=kafka.SocketServerStats"; + + functionFeed = FunctionFeed.builder() + .entity(this) + .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP) + .period(500, TimeUnit.MILLISECONDS) + .callable(new Callable<Boolean>() { + public Boolean call() throws Exception { + return getDriver().isRunning(); + } + }) + .onError(Functions.constant(Boolean.FALSE))) + .build(); + + jmxFeed = JmxFeed.builder() + .entity(this) + .period(500, TimeUnit.MILLISECONDS) + .pollAttribute(new JmxAttributePollConfig<Long>(FETCH_REQUEST_COUNT) + .objectName(socketServerStatsMbean) + .attributeName("NumFetchRequests") + .onError(Functions.constant(-1l))) + .pollAttribute(new JmxAttributePollConfig<Long>(TOTAL_FETCH_TIME) + .objectName(socketServerStatsMbean) + .attributeName("TotalFetchRequestMs") + .onError(Functions.constant(-1l))) + .pollAttribute(new JmxAttributePollConfig<Double>(MAX_FETCH_TIME) + .objectName(socketServerStatsMbean) + .attributeName("MaxFetchRequestMs") + .onError(Functions.constant(-1.0d))) + .pollAttribute(new JmxAttributePollConfig<Long>(PRODUCE_REQUEST_COUNT) + .objectName(socketServerStatsMbean) + .attributeName("NumProduceRequests") + .onError(Functions.constant(-1l))) + .pollAttribute(new JmxAttributePollConfig<Long>(TOTAL_PRODUCE_TIME) + .objectName(socketServerStatsMbean) + .attributeName("TotalProduceRequestMs") + .onError(Functions.constant(-1l))) + .pollAttribute(new JmxAttributePollConfig<Double>(MAX_PRODUCE_TIME) + .objectName(socketServerStatsMbean) + .attributeName("MaxProduceRequestMs") + .onError(Functions.constant(-1.0d))) + .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_RECEIVED) + .objectName(socketServerStatsMbean) + .attributeName("TotalBytesRead") + .onError(Functions.constant(-1l))) + .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_SENT) + .objectName(socketServerStatsMbean) + .attributeName("TotalBytesWritten") + .onError(Functions.constant(-1l))) + .build(); + } + + @Override + public void disconnectSensors() { + super.disconnectSensors(); + if (functionFeed != null) functionFeed.stop(); + if (jmxFeed != null) jmxFeed.stop(); + } + + @Override + protected ToStringHelper toStringHelper() { + return super.toStringHelper().add("kafkaPort", getKafkaPort()); + } + + @Override + public void setBrokerUrl() { + // TODO + } + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java new file mode 100644 index 0000000..e6c9e4e --- /dev/null +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java @@ -0,0 +1,163 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.lang.String.format; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.BrooklynVersion; +import brooklyn.entity.basic.lifecycle.CommonCommands; +import brooklyn.entity.drivers.downloads.DownloadResolver; +import brooklyn.entity.java.JavaSoftwareProcessSshDriver; +import brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.MutableMap; +import brooklyn.util.NetworkUtils; +import brooklyn.util.ResourceUtils; +import brooklyn.util.jmx.jmxrmi.JmxRmiAgent; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class KafkaBrokerSshDriver extends JavaSoftwareProcessSshDriver implements KafkaBrokerDriver { + + private static final Logger log = LoggerFactory.getLogger(KafkaBrokerSshDriver.class); + + private String expandedInstallDir; + + public KafkaBrokerSshDriver(KafkaBrokerImpl entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + protected String getLogFileLocation() { return getRunDir()+"/kafka-log"; } + + @Override + public Integer getKafkaPort() { return entity.getAttribute(KafkaBroker.KAFKA_PORT); } + + private String getExpandedInstallDir() { + if (expandedInstallDir == null) throw new IllegalStateException("expandedInstallDir is null; most likely install was not called"); + return expandedInstallDir; + } + + @Override + public void install() { + DownloadResolver resolver = entity.getManagementContext().getEntityDownloadsManager().newDownloader(this); + List<String> urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + expandedInstallDir = getInstallDir()+"/"+resolver.getUnpackedDirectoryName(format("kafka-%s-src", getVersion())); + + List<String> commands = new LinkedList<String>(); + commands.addAll(CommonCommands.downloadUrlAs(urls, saveAs)); + commands.add(CommonCommands.INSTALL_TAR); + commands.add("tar xzfv "+saveAs); + commands.add("cd "+expandedInstallDir); + commands.add("./sbt update"); + commands.add("./sbt package"); + + newScript(INSTALLING) + .failOnNonZeroResultCode() + .body.append(commands) + .execute(); + } + + @Override + public void customize() { + NetworkUtils.checkPortsValid(MutableMap.of("kafkaPort", getKafkaPort())); + newScript(CUSTOMIZING) + .failOnNonZeroResultCode() + .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), getRunDir())) + .execute(); + + String serverConfig = entity.getConfig(KafkaBroker.SERVER_CONFIG_TEMPLATE); + copyTemplate(serverConfig, "server.properties"); + + // Copy JMX agent Jar to server + getMachine().copyTo(new ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), getJmxRmiAgentJarDestinationFilePath()); + } + + public String getJmxRmiAgentJarBasename() { + return "brooklyn-jmxrmi-agent-" + BrooklynVersion.get() + ".jar"; + } + + public String getJmxRmiAgentJarUrl() { + return "classpath://" + getJmxRmiAgentJarBasename(); + } + + public String getJmxRmiAgentJarDestinationFilePath() { + return getRunDir() + "/" + getJmxRmiAgentJarBasename(); + } + + @Override + public void launch() { + newScript(ImmutableMap.of("usePidFile", getPidFile()), LAUNCHING) + .failOnNonZeroResultCode() + .body.append("nohup ./bin/kafka-server-start.sh ./server.properties > console.out 2>&1 &") + .execute(); + } + + public String getPidFile() { return getRunDir() + "/kafka.pid"; } + + @Override + public boolean isRunning() { + return newScript(ImmutableMap.of("usePidFile", getPidFile()), CHECK_RUNNING).execute() == 0; + } + + @Override + public void stop() { + newScript(ImmutableMap.of("usePidFile", false), STOPPING) + .body.append("ps ax | grep kafka\\.Kafka | awk '{print $1}' | xargs kill") + .body.append("ps ax | grep kafka\\.Kafka | awk '{print $1}' | xargs kill -9") + .execute(); + } + + @Override + protected Map<String, ?> getJmxJavaSystemProperties() { + return MutableMap.<String, Object> builder() + .put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort()) + .put(JmxRmiAgent.RMI_REGISTRY_PORT_PROPERTY, getRmiServerPort()) + .put("com.sun.management.jmxremote.ssl", false) + .put("com.sun.management.jmxremote.authenticate", false) + .put("java.rmi.server.hostname", getHostname()) + .build(); + } + + @Override + protected List<String> getJmxJavaConfigOptions() { + return ImmutableList.of("-javaagent:" + getJmxRmiAgentJarDestinationFilePath()); + } + + /** + * Use RMI agent to provide JMX. + */ + @Override + public Map<String, String> getShellEnvironment() { + Map<String, String> orig = super.getShellEnvironment(); + String kafkaJmxOpts = orig.remove("JAVA_OPTS"); + return MutableMap.<String, String>builder() + .putAll(orig) + .put("KAFKA_JMX_OPTS", kafkaJmxOpts) + .build(); + } + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java new file mode 100644 index 0000000..45843f3 --- /dev/null +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java @@ -0,0 +1,124 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import brooklyn.catalog.Catalog; +import brooklyn.config.ConfigKey; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.ConfigurableEntityFactory; +import brooklyn.entity.group.Cluster; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.BasicEntitySpec; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.entity.trait.Resizable; +import brooklyn.entity.trait.Startable; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.util.flags.SetFromFlag; + +/** + * This entity contains the sub-groups and entities that go in to a single location (e.g. datacenter) + * to provide Kafka cluster functionality. + * <p> + * You can customise the broker by customising the factory (by reference in calling code) + * or supplying your own factory (as a config flag). + * <p> + * The contents of this group entity are: + * <ul> + * <li>a {@link brooklyn.entity.group.DynamicCluster} of {@link KafkaBroker}s + * <li>a {@link KafkaZookeeper} + * <li>a {@link brooklyn.policy.Policy} to resize the DynamicCluster + * </ul> + */ +@SuppressWarnings({ "unchecked", "rawtypes" }) +@Catalog(name="Kafka", description="Apache Kafka is a distributed publish-subscribe messaging system") +@ImplementedBy(KafkaClusterImpl.class) +public interface KafkaCluster extends Entity, Startable, Resizable { + + public static class Spec<T extends KafkaCluster, S extends Spec<T,S>> extends BasicEntitySpec<T,S> { + + private static class ConcreteSpec extends Spec<KafkaCluster, ConcreteSpec> { + ConcreteSpec() { + super(KafkaCluster.class); + } + } + + public static Spec<KafkaCluster, ?> newInstance() { + return new ConcreteSpec(); + } + + protected Spec(Class<T> type) { + super(type); + } + + public S initialSize(int val) { + configure(INITIAL_SIZE, 1); + return self(); + } + + public S zookeeper(KafkaZookeeper val) { + configure(ZOOKEEPER, val); + return self(); + } + + public S brokerSpec(EntitySpec<KafkaBroker> val) { + configure(BROKER_SPEC, val); + return self(); + } + + public S brokerFactory(ConfigurableEntityFactory<KafkaBroker> val) { + configure(BROKER_FACTORY, val); + return self(); + } + } + + @SetFromFlag("initialSize") + public static ConfigKey<Integer> INITIAL_SIZE = new BasicConfigKey<Integer>(Cluster.INITIAL_SIZE, 1); + + @SetFromFlag("controller") + public static BasicAttributeSensorAndConfigKey<KafkaZookeeper> ZOOKEEPER = new BasicAttributeSensorAndConfigKey<KafkaZookeeper>( + KafkaZookeeper.class, "kafkacluster.zookeeper", "Kafka zookeeper for the cluster; if null a default will created"); + + @SetFromFlag("zookeeperSpec") + public static BasicAttributeSensorAndConfigKey<EntitySpec<KafkaZookeeper>> ZOOKEEPER_SPEC = new BasicAttributeSensorAndConfigKey( + EntitySpec.class, "kafkacluster.zookeeperSpec", "Spec for creating the kafka zookeeper"); + + /** Factory to create a Kafka broker, given flags */ + @SetFromFlag("brokerFactory") + public static BasicAttributeSensorAndConfigKey<ConfigurableEntityFactory<KafkaBroker>> BROKER_FACTORY = new BasicAttributeSensorAndConfigKey( + ConfigurableEntityFactory.class, "kafkacluster.brokerFactory", "Factory to create a Kafka broker"); + + /** Spec for Kafka broker entiites to be created */ + @SetFromFlag("brokerSpec") + public static BasicAttributeSensorAndConfigKey<EntitySpec<KafkaBroker>> BROKER_SPEC = new BasicAttributeSensorAndConfigKey( + EntitySpec.class, "kafkacluster.brokerSpec", "Spec for Kafka broker entiites to be created"); + + public static AttributeSensor<DynamicCluster> CLUSTER = new BasicAttributeSensor<DynamicCluster>( + DynamicCluster.class, "kafkacluster.cluster", "Underlying Kafka broker cluster"); + + public static final AttributeSensor<String> HOSTNAME = Attributes.HOSTNAME; + + public KafkaZookeeper getZookeeper(); + + public ConfigurableEntityFactory<KafkaBroker> getBrokerFactory(); + + public DynamicCluster getCluster(); + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java new file mode 100644 index 0000000..969a140 --- /dev/null +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java @@ -0,0 +1,194 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.enricher.basic.SensorPropagatingEnricher; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.AbstractEntity; +import brooklyn.entity.basic.ConfigurableEntityFactory; +import brooklyn.entity.basic.Entities; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.BasicEntitySpec; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.proxying.WrappingEntitySpec; +import brooklyn.entity.trait.Startable; +import brooklyn.event.feed.ConfigToAttributes; +import brooklyn.location.Location; +import brooklyn.util.MutableList; +import brooklyn.util.MutableMap; +import brooklyn.util.exceptions.Exceptions; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +/** + * Implementation of a Kafka cluster containing a {@link KafkaZookeeper} node and a group of {@link KafkaBroker}s. + */ +public class KafkaClusterImpl extends AbstractEntity implements KafkaCluster { + + public static final Logger log = LoggerFactory.getLogger(KafkaClusterImpl.class); + + public KafkaClusterImpl() { + this(MutableMap.of(), null); + } + public KafkaClusterImpl(Map<?, ?> flags) { + this(flags, null); + } + public KafkaClusterImpl(Entity parent) { + this(MutableMap.of(), parent); + } + public KafkaClusterImpl(Map<?, ?> flags, Entity parent) { + super(flags, parent); + setAttribute(SERVICE_UP, false); + } + + @Override + public void postConstruct() { + ConfigToAttributes.apply(this, BROKER_FACTORY); + ConfigToAttributes.apply(this, BROKER_SPEC); + ConfigToAttributes.apply(this, ZOOKEEPER); + ConfigToAttributes.apply(this, ZOOKEEPER_SPEC); + + log.debug("creating zookeeper child for {}", this); + KafkaZookeeper zookeeper = getAttribute(ZOOKEEPER); + if (zookeeper == null) { + EntitySpec<KafkaZookeeper> zookeeperSpec = getAttribute(ZOOKEEPER_SPEC); + if (zookeeperSpec == null) { + log.debug("creating controller using default spec for {}", this); + zookeeperSpec = BasicEntitySpec.newInstance(KafkaZookeeper.class); + setAttribute(ZOOKEEPER_SPEC, zookeeperSpec); + } else { + log.debug("creating controller using custom spec for {}", this); + } + zookeeper = getEntityManager().createEntity(WrappingEntitySpec.newInstance(zookeeperSpec).parent(this)); + if (Entities.isManaged(this)) Entities.manage(zookeeper); + setAttribute(ZOOKEEPER, zookeeper); + } + + log.debug("creating cluster child for {}", this); + ConfigurableEntityFactory<KafkaBroker> brokerFactory = getAttribute(BROKER_FACTORY); + EntitySpec<KafkaBroker> brokerSpec = getAttribute(BROKER_SPEC); + if (brokerFactory == null && brokerSpec == null) { + log.debug("creating default broker spec for {}", this); + brokerSpec = BasicEntitySpec.newInstance(KafkaBroker.class); + setAttribute(BROKER_SPEC, brokerSpec); + } + // Note relies on initial_size being inherited by DynamicCluster, because key id is identical + // We add the zookeeper configuration to the KafkaBroker specification or factory here + Map<String,Object> flags; + if (brokerSpec != null) { + flags = MutableMap.<String, Object>of("memberSpec", WrappingEntitySpec.newInstance(brokerSpec).configure(KafkaBroker.ZOOKEEPER, zookeeper)); + } else { + brokerFactory.configure(KafkaBroker.ZOOKEEPER, zookeeper); + flags = MutableMap.<String, Object>of("factory", brokerFactory); + } + DynamicCluster cluster = getEntityManager().createEntity(BasicEntitySpec.newInstance(DynamicCluster.class) + .parent(this) + .configure(flags)); + if (Entities.isManaged(this)) Entities.manage(cluster); + setAttribute(CLUSTER, cluster); + } + + @Override + public KafkaZookeeper getZookeeper() { + return getAttribute(ZOOKEEPER); + } + + @Override + public synchronized ConfigurableEntityFactory<KafkaBroker> getBrokerFactory() { + return (ConfigurableEntityFactory<KafkaBroker>) getAttribute(BROKER_FACTORY); + } + + @Override + public synchronized DynamicCluster getCluster() { + return getAttribute(CLUSTER); + } + + @Override + public void start(Collection<? extends Location> locations) { + if (isLegacyConstruction()) { + postConstruct(); + } + + if (locations.isEmpty()) locations = this.getLocations(); + Iterables.getOnlyElement(locations); //assert just one + addLocations(locations); + + List<Entity> childrenToStart = MutableList.<Entity>of(getCluster()); + // Set the KafkaZookeeper entity as child of cluster, if it does not already have a parent + if (getZookeeper().getParent() == null) { + addChild(getZookeeper()); + } + // And only start zookeeper if we are parent + if (this.equals(getZookeeper().getParent())) childrenToStart.add(getZookeeper()); + try { + Entities.invokeEffectorList(this, childrenToStart, Startable.START, ImmutableMap.of("locations", locations)).get(); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } catch (ExecutionException e) { + throw Exceptions.propagate(e); + } + + connectSensors(); + } + + @Override + public void stop() { + if (this.equals(getZookeeper().getParent())) { + getZookeeper().stop(); + } + getCluster().stop(); + + super.getLocations().clear(); + setAttribute(SERVICE_UP, false); + } + + @Override + public void restart() { + // TODO prod the entities themselves to restart, instead? + Collection<Location> locations = Lists.newArrayList(getLocations()); + + stop(); + start(locations); + } + + void connectSensors() { + SensorPropagatingEnricher.newInstanceListeningToAllSensorsBut(getCluster(), SERVICE_UP) + .addToEntityAndEmitAll(this); + SensorPropagatingEnricher.newInstanceListeningTo(getZookeeper(), SERVICE_UP) + .addToEntityAndEmitAll(this); + } + + @Override + public Integer resize(Integer desiredSize) { + return getCluster().resize(desiredSize); + } + + /** @return the current size of the group. */ + public Integer getCurrentSize() { + return getCluster().getCurrentSize(); + } + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java new file mode 100644 index 0000000..5f50c6d --- /dev/null +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java @@ -0,0 +1,57 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import java.util.Map; + +import brooklyn.entity.Entity; +import brooklyn.entity.basic.AbstractEntity; +import brooklyn.entity.messaging.Topic; +import brooklyn.util.MutableMap; + +public class KafkaTopic extends AbstractEntity implements Topic { + + public KafkaTopic() { + super(MutableMap.of(), null); + } + public KafkaTopic(Map properties) { + super(properties, null); + } + public KafkaTopic(Entity parent) { + super(MutableMap.of(), parent); + } + public KafkaTopic(Map properties, Entity parent) { + super(properties, parent); + } + + // kafka:type=kafka.logs.${topicName} + + @Override + public String getTopicName() { + return null; // TODO + } + + @Override + public void create() { + // TODO + } + + @Override + public void delete() { + // TODO + } + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java new file mode 100644 index 0000000..8e1b5da --- /dev/null +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java @@ -0,0 +1,51 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.java.UsesJmx; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.util.flags.SetFromFlag; + +/** + * An {@link brooklyn.entity.Entity} that represents a single Kafka zookeeper instance. + */ +@ImplementedBy(KafkaZookeeperImpl.class) +public interface KafkaZookeeper extends SoftwareProcess, UsesJmx, Kafka { + + @SetFromFlag("version") + BasicConfigKey<String> SUGGESTED_VERSION = Kafka.SUGGESTED_VERSION; + + @SetFromFlag("zookeeperPort") + PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+"); + + /** Location of the configuration file template to be copied to the server.*/ + @SetFromFlag("zookeeperConfig") + BasicConfigKey<String> ZOOKEEPER_CONFIG_TEMPLATE = new BasicConfigKey<String>( + String.class, "kafka.config.zookeeper", "Zookeeper configuration template (in freemarker format)", "classpath://brooklyn/entity/messaging/kafka/zookeeper.properties"); + + BasicAttributeSensor<Long> OUTSTANDING_REQUESTS = new BasicAttributeSensor<Long>(Long.class, "kafka.zookeeper.outstandingRequests", "Outstanding request count"); + BasicAttributeSensor<Long> PACKETS_RECEIVED = new BasicAttributeSensor<Long>(Long.class, "kafka.zookeeper.packets.received", "Total packets received"); + BasicAttributeSensor<Long> PACKETS_SENT = new BasicAttributeSensor<Long>(Long.class, "kafka.zookeeper.packets.sent", "Total packets sent"); + + Integer getZookeeperPort(); + + String getHostname(); + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java new file mode 100644 index 0000000..ac1c8fe --- /dev/null +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java @@ -0,0 +1,24 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import brooklyn.entity.basic.SoftwareProcessDriver; + +public interface KafkaZookeeperDriver extends SoftwareProcessDriver { + + Integer getZookeeperPort(); + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java new file mode 100644 index 0000000..d941411 --- /dev/null +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java @@ -0,0 +1,128 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.Entity; +import brooklyn.entity.basic.SoftwareProcessImpl; +import brooklyn.entity.messaging.qpid.QpidBroker; +import brooklyn.event.feed.function.FunctionFeed; +import brooklyn.event.feed.function.FunctionPollConfig; +import brooklyn.event.feed.jmx.JmxAttributePollConfig; +import brooklyn.event.feed.jmx.JmxFeed; +import brooklyn.util.MutableMap; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Objects.ToStringHelper; +import com.google.common.collect.Sets; + +/** + * An {@link brooklyn.entity.Entity} that represents a single Kafka zookeeper instance. + */ +public class KafkaZookeeperImpl extends SoftwareProcessImpl implements KafkaZookeeper { + private static final Logger log = LoggerFactory.getLogger(KafkaZookeeperImpl.class); + + public KafkaZookeeperImpl() { + super(); + } + public KafkaZookeeperImpl(Map<?, ?> properties) { + this(properties, null); + } + public KafkaZookeeperImpl(Entity parent) { + this(MutableMap.of(), parent); + } + public KafkaZookeeperImpl(Map<?, ?> properties, Entity parent) { + super(properties, parent); + } + + @Override + public Integer getZookeeperPort() { return getAttribute(ZOOKEEPER_PORT); } + + @Override + public String getHostname() { return getAttribute(HOSTNAME); } + + @Override + public Class getDriverInterface() { + return KafkaZookeeperDriver.class; + } + + @Override + protected Collection<Integer> getRequiredOpenPorts() { + Set<Integer> ports = Sets.newLinkedHashSet(super.getRequiredOpenPorts()); + ports.add(getAttribute(ZOOKEEPER_PORT)); + log.debug("getRequiredOpenPorts detected expanded ports {} for {}", ports, this); + return ports; + } + + private volatile FunctionFeed functionFeed; + private volatile JmxFeed jmxFeed; + + @Override + protected void connectSensors() { + String zookeeperMbean = "org.apache.ZooKeeperService:name0=StandaloneServer_port-1"; + + functionFeed = FunctionFeed.builder() + .entity(this) + .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP) + .period(500, TimeUnit.MILLISECONDS) + .callable(new Callable<Boolean>() { + public Boolean call() throws Exception { + return getDriver().isRunning(); + } + }) + .onError(Functions.constant(Boolean.FALSE))) + .build(); + + jmxFeed = JmxFeed.builder() + .entity(this) + .period(500, TimeUnit.MILLISECONDS) + .pollAttribute(new JmxAttributePollConfig<Long>(OUTSTANDING_REQUESTS) + .objectName(zookeeperMbean) + .attributeName("OutstandingRequests") + .onError(Functions.constant(-1l))) + .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_RECEIVED) + .objectName(zookeeperMbean) + .attributeName("PacketsReceived") + .onError(Functions.constant(-1l))) + .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_SENT) + .objectName(zookeeperMbean) + .attributeName("PacketsSent") + .onError(Functions.constant(-1l))) + .build(); + } + + @Override + public void disconnectSensors() { + super.disconnectSensors(); + if (functionFeed != null) functionFeed.stop(); + if (jmxFeed != null) jmxFeed.stop(); + } + + @Override + protected ToStringHelper toStringHelper() { + return super.toStringHelper().add("zookeeperPort", getZookeeperPort()); + } + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java new file mode 100644 index 0000000..4310489 --- /dev/null +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java @@ -0,0 +1,121 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import static java.lang.String.format; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.lifecycle.CommonCommands; +import brooklyn.entity.drivers.downloads.DownloadResolver; +import brooklyn.entity.java.JavaSoftwareProcessSshDriver; +import brooklyn.location.basic.SshMachineLocation; +import brooklyn.util.MutableMap; +import brooklyn.util.NetworkUtils; + +import com.google.common.collect.ImmutableMap; + +public class KafkaZookeeperSshDriver extends JavaSoftwareProcessSshDriver implements KafkaZookeeperDriver { + + private static final Logger log = LoggerFactory.getLogger(KafkaZookeeperSshDriver.class); + + private String expandedInstallDir; + + public KafkaZookeeperSshDriver(KafkaZookeeperImpl entity, SshMachineLocation machine) { + super(entity, machine); + } + + @Override + protected String getLogFileLocation() { return getRunDir()+"/kafka-log"; } + + @Override + public Integer getZookeeperPort() { return entity.getAttribute(KafkaZookeeper.ZOOKEEPER_PORT); } + + private String getExpandedInstallDir() { + if (expandedInstallDir == null) throw new IllegalStateException("expandedInstallDir is null; most likely install was not called"); + return expandedInstallDir; + } + + @Override + public void install() { + DownloadResolver resolver = entity.getManagementContext().getEntityDownloadsManager().newDownloader(this); + List<String> urls = resolver.getTargets(); + String saveAs = resolver.getFilename(); + expandedInstallDir = getInstallDir()+"/"+resolver.getUnpackedDirectoryName(format("kafka-%s-src", getVersion())); + + List<String> commands = new LinkedList<String>(); + commands.addAll(CommonCommands.downloadUrlAs(urls, saveAs)); + commands.add(CommonCommands.INSTALL_TAR); + commands.add("tar xzfv "+saveAs); + commands.add("cd "+expandedInstallDir); + commands.add("./sbt update"); + commands.add("./sbt package"); + + newScript(INSTALLING) + .failOnNonZeroResultCode() + .body.append(commands) + .execute(); + } + + @Override + public void customize() { + NetworkUtils.checkPortsValid(MutableMap.of("zookeeperPort", getZookeeperPort())); + newScript(CUSTOMIZING) + .failOnNonZeroResultCode() + .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), getRunDir())) + .execute(); + + String serverConfig = entity.getConfig(KafkaZookeeper.ZOOKEEPER_CONFIG_TEMPLATE); + copyTemplate(serverConfig, "zookeeper.properties"); + } + + @Override + public void launch() { + newScript(ImmutableMap.of("usePidFile", getPidFile()), LAUNCHING) + .failOnNonZeroResultCode() + .body.append("nohup ./bin/zookeeper-server-start.sh ./zookeeper.properties > console.out 2>&1 &") + .execute(); + } + + public String getPidFile() { return getRunDir() + "/kafka.pid"; } + + @Override + public boolean isRunning() { + return newScript(ImmutableMap.of("usePidFile", getPidFile()), CHECK_RUNNING).execute() == 0; + } + + @Override + public void stop() { + newScript(ImmutableMap.of("usePidFile", false), STOPPING) + .body.append("ps ax | grep quorum\\.QuorumPeerMain | awk '{print $1}' | xargs kill") + .body.append("ps ax | grep quorum\\.QuorumPeerMain | awk '{print $1}' | xargs kill -9") + .execute(); + } + + @Override + public Map<String, String> getShellEnvironment() { + Map<String, String> orig = super.getShellEnvironment(); + return MutableMap.<String, String>builder() + .put("KAFKA_JMX_OPTS", orig.get("JAVA_OPTS")) + .build(); + } + +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties new file mode 100644 index 0000000..b440076 --- /dev/null +++ b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties @@ -0,0 +1,120 @@ +[#ftl] +# Copyright 2013 by Cloudsoft Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## +# KafkaBroker configuration template for Brooklyn +# +# see kafka.server.KafkaConfig for additional details and defaults +## + +############################# Server Basics ############################# +# The id of the broker. This must be set to a unique integer for each broker. +brokerid=${entity.brokerId?c} + +# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned +# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost +# may not be what you want. +hostname=${driver.hostname} + + +############################# Socket Server Settings ############################# + +# The port the socket server listens on +port=${entity.kafkaPort?c} + +# The number of processor threads the socket server uses for receiving and answering requests. +# Defaults to the number of cores on the machine +num.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer=1048576 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer=1048576 + +# The maximum size of a request that the socket server will accept (protection against OOM) +max.socket.request.bytes=104857600 + + +############################# Log Basics ############################# + +# The directory under which to store log files +log.dir=${driver.runDir}/kafka-logs + +# The number of logical partitions per topic per server. More partitions allow greater parallelism +# for consumption, but also mean more files. +num.partitions=1 + +# Overrides for for the default given by num.partitions on a per-topic basis +#topic.partition.count.map=topic1:3, topic2:4 + +############################# Log Flush Policy ############################# + +# The following configurations control the flush of data to disk. This is the most +# important performance knob in kafka. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. +# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). +# 3. Throughput: The flush is generally the most expensive operation. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +log.flush.interval=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +log.default.flush.interval.ms=1000 + +# Per-topic overrides for log.default.flush.interval.ms +#topic.flush.intervals.ms=topic1:1000, topic2:3000 + +# The interval (in ms) at which logs are checked to see if they need to be flushed to disk. +log.default.flush.scheduler.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.size. +#log.retention.size=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.file.size=536870912 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.cleanup.interval.mins=1 + +############################# Zookeeper ############################# + +# Enable connecting to zookeeper +enable.zookeeper=true + +# Zk connection string (see zk docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zk.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c} + +# Timeout in ms for connecting to zookeeper +zk.connectiontimeout.ms=1000000 http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties new file mode 100644 index 0000000..4827480 --- /dev/null +++ b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties @@ -0,0 +1,25 @@ +[#ftl] +# Copyright 2013 by Cloudsoft Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## +# KafkaZookeeper configuration template for Brooklyn +## + +# the directory where the snapshot is stored. +dataDir=${driver.runDir}/zookeeper +# the port at which the clients will connect +clientPort=${entity.zookeeperPort?c} +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy index f50de0c..28ff308 100644 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy +++ b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy @@ -49,7 +49,11 @@ public class ActiveMQIntegrationTest { @AfterMethod(groups = "Integration") public void shutdown() { - if (app != null) Entities.destroyAll(app); + try { + if (app != null) Entities.destroyAll(app); + } catch (Exception e) { + log.warn("Error stopping entities", e); + } } /** http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy new file mode 100644 index 0000000..0943dcd --- /dev/null +++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy @@ -0,0 +1,120 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import static brooklyn.test.TestUtils.* +import static java.util.concurrent.TimeUnit.* +import static org.testng.Assert.* + +import java.util.concurrent.TimeUnit + +import javax.jms.Connection +import javax.jms.MessageConsumer +import javax.jms.MessageProducer +import javax.jms.Queue +import javax.jms.Session +import javax.jms.TextMessage + +import org.apache.activemq.ActiveMQConnectionFactory +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.testng.annotations.AfterMethod +import org.testng.annotations.BeforeMethod +import org.testng.annotations.Test + +import brooklyn.entity.basic.ApplicationBuilder +import brooklyn.entity.basic.Entities +import brooklyn.entity.proxying.BasicEntitySpec +import brooklyn.entity.trait.Startable +import brooklyn.location.Location +import brooklyn.location.basic.LocalhostMachineProvisioningLocation +import brooklyn.test.entity.TestApplication +import brooklyn.util.internal.TimeExtras + +/** + * Test the operation of the {@link ActiveMQBroker} class. + */ +public class KafkaIntegrationTest { + private static final Logger log = LoggerFactory.getLogger(KafkaIntegrationTest.class) + + static { TimeExtras.init() } + + private TestApplication app + private Location testLocation + + @BeforeMethod(groups = "Integration") + public void setup() { + app = ApplicationBuilder.builder(TestApplication.class).manage(); + testLocation = new LocalhostMachineProvisioningLocation() + } + + @AfterMethod(groups = "Integration") + public void shutdown() { + if (app != null) Entities.destroyAll(app); + } + + /** + * Test that we can start a zookeeper. + */ + @Test(groups = "Integration") + public void testZookeeper() { + KafkaZookeeper zookeeper = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaZookeeper.class)); + + zookeeper.start([ testLocation ]) + executeUntilSucceedsWithShutdown(zookeeper, timeout:600*TimeUnit.SECONDS) { + assertTrue zookeeper.getAttribute(Startable.SERVICE_UP) + } + assertFalse zookeeper.getAttribute(Startable.SERVICE_UP) + } + + /** + * Test that we can start a broker and zookeeper together. + */ + @Test(groups = "Integration") + public void testBrokerPlusZookeeper() { + KafkaZookeeper zookeeper = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaZookeeper.class)); + KafkaBroker broker = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaBroker.class).configure(KafkaBroker.ZOOKEEPER, zookeeper)); + + zookeeper.start([ testLocation ]) + executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) { + assertTrue zookeeper.getAttribute(Startable.SERVICE_UP) + } + + broker.start([ testLocation ]) + executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) { + assertTrue broker.getAttribute(Startable.SERVICE_UP) + } + } + + /** + * Test that we can start a cluster with zookeeper and one broker. + */ + @Test(groups = "Integration") + public void testSingleBrokerCluster() { + KafkaCluster cluster = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaCluster.class).configure(KafkaCluster.INITIAL_SIZE, 1)); + + cluster.start([ testLocation ]) + executeUntilSucceedsWithShutdown(cluster, timeout:600*TimeUnit.SECONDS) { + assertTrue cluster.getAttribute(Startable.SERVICE_UP) + Entities.dumpInfo(cluster) + } + assertFalse cluster.getAttribute(Startable.SERVICE_UP) + } + + // TODO test with API sending messages + // TODO test that sensors update + // TODO add demo application +} http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java new file mode 100644 index 0000000..6229b4e --- /dev/null +++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2013 by Cloudsoft Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package brooklyn.entity.messaging.kafka; + +import brooklyn.entity.AbstractEc2LiveTest; +import brooklyn.location.Location; + +public class KafkaLiveTest extends AbstractEc2LiveTest { + + /** + * Test Kafka cluster operation. + */ + @Override + protected void doTest(Location loc) throws Exception { + throw new UnsupportedOperationException(); + } + +}
