New entity for Apache Kafka messaging

Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/37e890c2
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/37e890c2
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/37e890c2

Branch: refs/heads/0.5.0
Commit: 37e890c2b55ffb46ddc1a1425db9fff2e9b73bb5
Parents: 74e4016
Author: Andrew Kennedy <[email protected]>
Authored: Tue Mar 19 18:16:01 2013 +0000
Committer: Andrew Kennedy <[email protected]>
Committed: Fri Apr 19 10:36:06 2013 +0100

----------------------------------------------------------------------
 .../brooklyn/entity/messaging/kafka/Kafka.java  |  36 ++++
 .../entity/messaging/kafka/KafkaBroker.java     |  67 +++++++
 .../messaging/kafka/KafkaBrokerDriver.java      |  24 +++
 .../entity/messaging/kafka/KafkaBrokerImpl.java | 171 ++++++++++++++++
 .../messaging/kafka/KafkaBrokerSshDriver.java   | 163 ++++++++++++++++
 .../entity/messaging/kafka/KafkaCluster.java    | 124 ++++++++++++
 .../messaging/kafka/KafkaClusterImpl.java       | 194 +++++++++++++++++++
 .../entity/messaging/kafka/KafkaTopic.java      |  57 ++++++
 .../entity/messaging/kafka/KafkaZookeeper.java  |  51 +++++
 .../messaging/kafka/KafkaZookeeperDriver.java   |  24 +++
 .../messaging/kafka/KafkaZookeeperImpl.java     | 128 ++++++++++++
 .../kafka/KafkaZookeeperSshDriver.java          | 121 ++++++++++++
 .../entity/messaging/kafka/server.properties    | 120 ++++++++++++
 .../entity/messaging/kafka/zookeeper.properties |  25 +++
 .../activemq/ActiveMQIntegrationTest.groovy     |   6 +-
 .../messaging/kafka/KafkaIntegrationTest.groovy | 120 ++++++++++++
 .../entity/messaging/kafka/KafkaLiveTest.java   |  31 +++
 17 files changed, 1461 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java
new file mode 100644
index 0000000..7f26f8e
--- /dev/null
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.util.flags.SetFromFlag;
+
+/**
+ * Shared Kafka broker and zookeeper properties.
+ */
+public interface Kafka {
+
+    BasicConfigKey<String> SUGGESTED_VERSION = new 
BasicConfigKey<String>(SoftwareProcess.SUGGESTED_VERSION, "0.7.2-incubating");
+
+    @SetFromFlag("downloadUrl")
+    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new 
BasicAttributeSensorAndConfigKey<String>(
+            Attributes.DOWNLOAD_URL, 
"http://mirror.catn.com/pub/apache/incubator/kafka/kafka-${version}/kafka-${version}-src.tgz";);
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
new file mode 100644
index 0000000..13b8d0d
--- /dev/null
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.java.UsesJmx;
+import brooklyn.entity.messaging.MessageBroker;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.util.flags.SetFromFlag;
+
+/**
+ * An {@link brooklyn.entity.Entity} that represents a single Kafka broker 
instance.
+ */
+@ImplementedBy(KafkaBrokerImpl.class)
+public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, 
Kafka {
+
+    @SetFromFlag("version")
+    BasicConfigKey<String> SUGGESTED_VERSION = Kafka.SUGGESTED_VERSION;
+
+    @SetFromFlag("kafkaPort")
+    PortAttributeSensorAndConfigKey KAFKA_PORT = new 
PortAttributeSensorAndConfigKey("kafka.port", "Kafka port", "9092+");
+
+    /** Location of the configuration file template to be copied to the 
server.*/
+    @SetFromFlag("serverConfig")
+    BasicConfigKey<String> SERVER_CONFIG_TEMPLATE = new BasicConfigKey<String>(
+            String.class, "kafka.config.server", "Server configuration 
template (in freemarker format)", 
"classpath://brooklyn/entity/messaging/kafka/server.properties");
+
+    @SetFromFlag("zookeeper")
+    BasicConfigKey<KafkaZookeeper> ZOOKEEPER = new 
BasicConfigKey<KafkaZookeeper>(KafkaZookeeper.class, "Kafka zookeeper entity");
+
+    AttributeSensor<Long> BROKER_ID = new 
BasicAttributeSensor<Long>(Long.class, "kafka.broker.id", "Kafka unique broker 
ID");
+
+    BasicAttributeSensor<Long> FETCH_REQUEST_COUNT = new 
BasicAttributeSensor<Long>(Long.class, "kafka.broker.fetch.total", "Fetch 
request count");
+    BasicAttributeSensor<Long> TOTAL_FETCH_TIME = new 
BasicAttributeSensor<Long>(Long.class, "kafka.broker.fetch.time.total", "Total 
fetch request processing time (millis)");
+    BasicAttributeSensor<Double> MAX_FETCH_TIME = new 
BasicAttributeSensor<Double>(Double.class, "kafka.broker.fetch.time.max", "Max 
fetch request processing time (millis)");
+
+    BasicAttributeSensor<Long> PRODUCE_REQUEST_COUNT = new 
BasicAttributeSensor<Long>(Long.class, "kafka.broker.produce.total", "Produce 
request count");
+    BasicAttributeSensor<Long> TOTAL_PRODUCE_TIME = new 
BasicAttributeSensor<Long>(Long.class, "kafka.broker.produce.time.total", 
"Total produce request processing time (millis)");
+    BasicAttributeSensor<Double> MAX_PRODUCE_TIME = new 
BasicAttributeSensor<Double>(Double.class, "kafka.broker.produce.time.max", 
"Max produce request processing time (millis)");
+
+    BasicAttributeSensor<Long> BYTES_RECEIVED = new 
BasicAttributeSensor<Long>(Long.class, "kafka.broker.bytes.received", "Total 
bytes received");
+    BasicAttributeSensor<Long> BYTES_SENT = new 
BasicAttributeSensor<Long>(Long.class, "kafka.broker.bytes.sent", "Total bytes 
sent");
+
+    Integer getKafkaPort();
+
+    Long getBrokerId();
+
+    KafkaZookeeper getZookeeper();
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java
new file mode 100644
index 0000000..c9caa03
--- /dev/null
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerDriver.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import brooklyn.entity.basic.SoftwareProcessDriver;
+
+public interface KafkaBrokerDriver extends SoftwareProcessDriver {
+
+    Integer getKafkaPort();
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
new file mode 100644
index 0000000..d76072e
--- /dev/null
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.entity.messaging.MessageBroker;
+import brooklyn.event.feed.function.FunctionFeed;
+import brooklyn.event.feed.function.FunctionPollConfig;
+import brooklyn.event.feed.jmx.JmxAttributePollConfig;
+import brooklyn.event.feed.jmx.JmxFeed;
+import brooklyn.util.MutableMap;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Objects.ToStringHelper;
+import com.google.common.collect.Sets;
+
+/**
+ * An {@link brooklyn.entity.Entity} that represents a single Kafka broker 
instance.
+ */
+public class KafkaBrokerImpl extends SoftwareProcessImpl implements 
MessageBroker, KafkaBroker {
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaBrokerImpl.class);
+
+    private static final AtomicLong brokers = new AtomicLong(0l);
+
+    public KafkaBrokerImpl() {
+        super();
+    }
+    public KafkaBrokerImpl(Map<?, ?> properties) {
+        this(properties, null);
+    }
+    public KafkaBrokerImpl(Entity parent) {
+        this(MutableMap.of(), parent);
+    }
+    public KafkaBrokerImpl(Map<?, ?> properties, Entity parent) {
+        super(properties, parent);
+    }
+
+    @Override
+    public void postConstruct() {
+        setAttribute(BROKER_ID, brokers.incrementAndGet());
+    }
+
+    @Override
+    public Integer getKafkaPort() { return getAttribute(KAFKA_PORT); }
+
+    @Override
+    public Long getBrokerId() { return getAttribute(BROKER_ID); }
+
+    @Override
+    public KafkaZookeeper getZookeeper() { return getConfig(ZOOKEEPER); }
+
+    public KafkaTopic createTopic(Map properties) {
+        KafkaTopic result = new KafkaTopic(properties, this);
+        Entities.manage(result);
+        result.create();
+        return result;
+    }
+
+    @Override
+    public Class getDriverInterface() {
+        return KafkaBrokerDriver.class;
+    }
+
+    @Override
+    protected Collection<Integer> getRequiredOpenPorts() {
+        Set<Integer> ports = 
Sets.newLinkedHashSet(super.getRequiredOpenPorts());
+        ports.add(getAttribute(KAFKA_PORT));
+        log.debug("getRequiredOpenPorts detected expanded ports {} for {}", 
ports, this);
+        return ports;
+    }
+
+    private volatile FunctionFeed functionFeed;
+    private volatile JmxFeed jmxFeed;
+
+    @Override
+    protected void connectSensors() {
+        String socketServerStatsMbean = "kafka:type=kafka.SocketServerStats";
+
+        functionFeed = FunctionFeed.builder()
+                .entity(this)
+                .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP)
+                        .period(500, TimeUnit.MILLISECONDS)
+                        .callable(new Callable<Boolean>() {
+                            public Boolean call() throws Exception {
+                                return getDriver().isRunning();
+                            }
+                        })
+                        .onError(Functions.constant(Boolean.FALSE)))
+                .build();
+
+        jmxFeed = JmxFeed.builder()
+                .entity(this)
+                .period(500, TimeUnit.MILLISECONDS)
+                .pollAttribute(new 
JmxAttributePollConfig<Long>(FETCH_REQUEST_COUNT)
+                        .objectName(socketServerStatsMbean)
+                        .attributeName("NumFetchRequests")
+                        .onError(Functions.constant(-1l)))
+                .pollAttribute(new 
JmxAttributePollConfig<Long>(TOTAL_FETCH_TIME)
+                        .objectName(socketServerStatsMbean)
+                        .attributeName("TotalFetchRequestMs")
+                        .onError(Functions.constant(-1l)))
+                .pollAttribute(new 
JmxAttributePollConfig<Double>(MAX_FETCH_TIME)
+                        .objectName(socketServerStatsMbean)
+                        .attributeName("MaxFetchRequestMs")
+                        .onError(Functions.constant(-1.0d)))
+                .pollAttribute(new 
JmxAttributePollConfig<Long>(PRODUCE_REQUEST_COUNT)
+                        .objectName(socketServerStatsMbean)
+                        .attributeName("NumProduceRequests")
+                        .onError(Functions.constant(-1l)))
+                .pollAttribute(new 
JmxAttributePollConfig<Long>(TOTAL_PRODUCE_TIME)
+                        .objectName(socketServerStatsMbean)
+                        .attributeName("TotalProduceRequestMs")
+                        .onError(Functions.constant(-1l)))
+                .pollAttribute(new 
JmxAttributePollConfig<Double>(MAX_PRODUCE_TIME)
+                        .objectName(socketServerStatsMbean)
+                        .attributeName("MaxProduceRequestMs")
+                        .onError(Functions.constant(-1.0d)))
+                .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_RECEIVED)
+                        .objectName(socketServerStatsMbean)
+                        .attributeName("TotalBytesRead")
+                        .onError(Functions.constant(-1l)))
+                .pollAttribute(new JmxAttributePollConfig<Long>(BYTES_SENT)
+                        .objectName(socketServerStatsMbean)
+                        .attributeName("TotalBytesWritten")
+                        .onError(Functions.constant(-1l)))
+                .build();
+    }
+
+    @Override
+    public void disconnectSensors() {
+        super.disconnectSensors();
+        if (functionFeed != null) functionFeed.stop();
+        if (jmxFeed != null) jmxFeed.stop();
+    }
+
+    @Override
+    protected ToStringHelper toStringHelper() {
+        return super.toStringHelper().add("kafkaPort", getKafkaPort());
+    }
+
+    @Override
+    public void setBrokerUrl() {
+        // TODO
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
new file mode 100644
index 0000000..e6c9e4e
--- /dev/null
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.BrooklynVersion;
+import brooklyn.entity.basic.lifecycle.CommonCommands;
+import brooklyn.entity.drivers.downloads.DownloadResolver;
+import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.MutableMap;
+import brooklyn.util.NetworkUtils;
+import brooklyn.util.ResourceUtils;
+import brooklyn.util.jmx.jmxrmi.JmxRmiAgent;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class KafkaBrokerSshDriver extends JavaSoftwareProcessSshDriver 
implements KafkaBrokerDriver {
+
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaBrokerSshDriver.class);
+
+    private String expandedInstallDir;
+
+    public KafkaBrokerSshDriver(KafkaBrokerImpl entity, SshMachineLocation 
machine) {
+        super(entity, machine);
+    }
+
+    @Override
+    protected String getLogFileLocation() { return getRunDir()+"/kafka-log"; }
+
+    @Override
+    public Integer getKafkaPort() { return 
entity.getAttribute(KafkaBroker.KAFKA_PORT); }
+
+    private String getExpandedInstallDir() {
+        if (expandedInstallDir == null) throw new 
IllegalStateException("expandedInstallDir is null; most likely install was not 
called");
+        return expandedInstallDir;
+    }
+
+    @Override
+    public void install() {
+        DownloadResolver resolver = 
entity.getManagementContext().getEntityDownloadsManager().newDownloader(this);
+        List<String> urls = resolver.getTargets();
+        String saveAs = resolver.getFilename();
+        expandedInstallDir = 
getInstallDir()+"/"+resolver.getUnpackedDirectoryName(format("kafka-%s-src", 
getVersion()));
+
+        List<String> commands = new LinkedList<String>();
+        commands.addAll(CommonCommands.downloadUrlAs(urls, saveAs));
+        commands.add(CommonCommands.INSTALL_TAR);
+        commands.add("tar xzfv "+saveAs);
+        commands.add("cd "+expandedInstallDir);
+        commands.add("./sbt update");
+        commands.add("./sbt package");
+
+        newScript(INSTALLING)
+                .failOnNonZeroResultCode()
+                .body.append(commands)
+                .execute();
+    }
+
+    @Override
+    public void customize() {
+        NetworkUtils.checkPortsValid(MutableMap.of("kafkaPort", 
getKafkaPort()));
+        newScript(CUSTOMIZING)
+                .failOnNonZeroResultCode()
+                .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), 
getRunDir()))
+                .execute();
+
+        String serverConfig = 
entity.getConfig(KafkaBroker.SERVER_CONFIG_TEMPLATE);
+        copyTemplate(serverConfig, "server.properties");
+
+        // Copy JMX agent Jar to server
+        getMachine().copyTo(new 
ResourceUtils(this).getResourceFromUrl(getJmxRmiAgentJarUrl()), 
getJmxRmiAgentJarDestinationFilePath());
+    }
+
+    public String getJmxRmiAgentJarBasename() {
+        return "brooklyn-jmxrmi-agent-" + BrooklynVersion.get() + ".jar";
+    }
+
+    public String getJmxRmiAgentJarUrl() {
+        return "classpath://" + getJmxRmiAgentJarBasename();
+    }
+
+    public String getJmxRmiAgentJarDestinationFilePath() {
+        return getRunDir() + "/" + getJmxRmiAgentJarBasename();
+    }
+
+    @Override
+    public void launch() {
+        newScript(ImmutableMap.of("usePidFile", getPidFile()), LAUNCHING)
+                .failOnNonZeroResultCode()
+                .body.append("nohup ./bin/kafka-server-start.sh 
./server.properties > console.out 2>&1 &")
+                .execute();
+    }
+
+    public String getPidFile() { return getRunDir() + "/kafka.pid"; }
+
+    @Override
+    public boolean isRunning() {
+        return newScript(ImmutableMap.of("usePidFile", getPidFile()), 
CHECK_RUNNING).execute() == 0;
+    }
+
+    @Override
+    public void stop() {
+        newScript(ImmutableMap.of("usePidFile", false), STOPPING)
+                .body.append("ps ax | grep kafka\\.Kafka | awk '{print $1}' | 
xargs kill")
+                .body.append("ps ax | grep kafka\\.Kafka | awk '{print $1}' | 
xargs kill -9")
+                .execute();
+    }
+
+    @Override
+    protected Map<String, ?> getJmxJavaSystemProperties() {
+        return MutableMap.<String, Object> builder()
+                .put(JmxRmiAgent.JMX_SERVER_PORT_PROPERTY, getJmxPort())
+                .put(JmxRmiAgent.RMI_REGISTRY_PORT_PROPERTY, 
getRmiServerPort())
+                .put("com.sun.management.jmxremote.ssl", false)
+                .put("com.sun.management.jmxremote.authenticate", false)
+                .put("java.rmi.server.hostname", getHostname())
+                .build();
+    }
+
+    @Override
+    protected List<String> getJmxJavaConfigOptions() {
+        return ImmutableList.of("-javaagent:" + 
getJmxRmiAgentJarDestinationFilePath());
+    }
+
+    /**
+     * Use RMI agent to provide JMX.
+     */
+    @Override
+    public Map<String, String> getShellEnvironment() {
+        Map<String, String> orig = super.getShellEnvironment();
+        String kafkaJmxOpts = orig.remove("JAVA_OPTS");
+        return MutableMap.<String, String>builder()
+                .putAll(orig)
+                .put("KAFKA_JMX_OPTS", kafkaJmxOpts)
+                .build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java
new file mode 100644
index 0000000..45843f3
--- /dev/null
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaCluster.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.ConfigurableEntityFactory;
+import brooklyn.entity.group.Cluster;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.BasicEntitySpec;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.entity.trait.Resizable;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.util.flags.SetFromFlag;
+
+/**
+ * This entity contains the sub-groups and entities that go in to a single 
location (e.g. datacenter)
+ * to provide Kafka cluster functionality.
+ * <p>
+ * You can customise the broker by customising the factory (by reference in 
calling code)
+ * or supplying your own factory (as a config flag).
+ * <p>
+ * The contents of this group entity are:
+ * <ul>
+ * <li>a {@link brooklyn.entity.group.DynamicCluster} of {@link KafkaBroker}s
+ * <li>a {@link KafkaZookeeper}
+ * <li>a {@link brooklyn.policy.Policy} to resize the DynamicCluster
+ * </ul>
+ */
+@SuppressWarnings({ "unchecked", "rawtypes" })
+@Catalog(name="Kafka", description="Apache Kafka is a distributed 
publish-subscribe messaging system")
+@ImplementedBy(KafkaClusterImpl.class)
+public interface KafkaCluster extends Entity, Startable, Resizable  {
+
+    public static class Spec<T extends KafkaCluster, S extends Spec<T,S>> 
extends BasicEntitySpec<T,S> {
+
+        private static class ConcreteSpec extends Spec<KafkaCluster, 
ConcreteSpec> {
+            ConcreteSpec() {
+                super(KafkaCluster.class);
+            }
+        }
+
+        public static Spec<KafkaCluster, ?> newInstance() {
+            return new ConcreteSpec();
+        }
+
+        protected Spec(Class<T> type) {
+            super(type);
+        }
+
+        public S initialSize(int val) {
+            configure(INITIAL_SIZE, 1);
+            return self();
+        }
+
+        public S zookeeper(KafkaZookeeper val) {
+            configure(ZOOKEEPER, val);
+            return self();
+        }
+
+        public S brokerSpec(EntitySpec<KafkaBroker> val) {
+            configure(BROKER_SPEC, val);
+            return self();
+        }
+
+        public S brokerFactory(ConfigurableEntityFactory<KafkaBroker> val) {
+            configure(BROKER_FACTORY, val);
+            return self();
+        }
+    }
+
+    @SetFromFlag("initialSize")
+    public static ConfigKey<Integer> INITIAL_SIZE = new 
BasicConfigKey<Integer>(Cluster.INITIAL_SIZE, 1);
+
+    @SetFromFlag("controller")
+    public static BasicAttributeSensorAndConfigKey<KafkaZookeeper> ZOOKEEPER = 
new BasicAttributeSensorAndConfigKey<KafkaZookeeper>(
+            KafkaZookeeper.class, "kafkacluster.zookeeper", "Kafka zookeeper 
for the cluster; if null a default will created");
+
+    @SetFromFlag("zookeeperSpec")
+    public static BasicAttributeSensorAndConfigKey<EntitySpec<KafkaZookeeper>> 
ZOOKEEPER_SPEC = new BasicAttributeSensorAndConfigKey(
+            EntitySpec.class, "kafkacluster.zookeeperSpec", "Spec for creating 
the kafka zookeeper");
+
+    /** Factory to create a Kafka broker, given flags */
+    @SetFromFlag("brokerFactory")
+    public static 
BasicAttributeSensorAndConfigKey<ConfigurableEntityFactory<KafkaBroker>> 
BROKER_FACTORY = new BasicAttributeSensorAndConfigKey(
+            ConfigurableEntityFactory.class, "kafkacluster.brokerFactory", 
"Factory to create a Kafka broker");
+
+    /** Spec for Kafka broker entiites to be created */
+    @SetFromFlag("brokerSpec")
+    public static BasicAttributeSensorAndConfigKey<EntitySpec<KafkaBroker>> 
BROKER_SPEC = new BasicAttributeSensorAndConfigKey(
+            EntitySpec.class, "kafkacluster.brokerSpec", "Spec for Kafka 
broker entiites to be created");
+
+    public static AttributeSensor<DynamicCluster> CLUSTER = new 
BasicAttributeSensor<DynamicCluster>(
+            DynamicCluster.class, "kafkacluster.cluster", "Underlying Kafka 
broker cluster");
+
+    public static final AttributeSensor<String> HOSTNAME = Attributes.HOSTNAME;
+
+    public KafkaZookeeper getZookeeper();
+
+    public ConfigurableEntityFactory<KafkaBroker> getBrokerFactory();
+
+    public DynamicCluster getCluster();
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
new file mode 100644
index 0000000..969a140
--- /dev/null
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaClusterImpl.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.enricher.basic.SensorPropagatingEnricher;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.AbstractEntity;
+import brooklyn.entity.basic.ConfigurableEntityFactory;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.BasicEntitySpec;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.proxying.WrappingEntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.feed.ConfigToAttributes;
+import brooklyn.location.Location;
+import brooklyn.util.MutableList;
+import brooklyn.util.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * Implementation of a Kafka cluster containing a {@link KafkaZookeeper} node 
and a group of {@link KafkaBroker}s.
+ */
+public class KafkaClusterImpl extends AbstractEntity implements KafkaCluster {
+
+    public static final Logger log = 
LoggerFactory.getLogger(KafkaClusterImpl.class);
+
+    public KafkaClusterImpl() {
+        this(MutableMap.of(), null);
+    }
+    public KafkaClusterImpl(Map<?, ?> flags) {
+        this(flags, null);
+    }
+    public KafkaClusterImpl(Entity parent) {
+        this(MutableMap.of(), parent);
+    }
+    public KafkaClusterImpl(Map<?, ?> flags, Entity parent) {
+        super(flags, parent);
+        setAttribute(SERVICE_UP, false);
+    }
+
+    @Override
+    public void postConstruct() {
+        ConfigToAttributes.apply(this, BROKER_FACTORY);
+        ConfigToAttributes.apply(this, BROKER_SPEC);
+        ConfigToAttributes.apply(this, ZOOKEEPER);
+        ConfigToAttributes.apply(this, ZOOKEEPER_SPEC);
+
+        log.debug("creating zookeeper child for {}", this);
+        KafkaZookeeper zookeeper = getAttribute(ZOOKEEPER);
+        if (zookeeper == null) {
+            EntitySpec<KafkaZookeeper> zookeeperSpec = 
getAttribute(ZOOKEEPER_SPEC);
+            if (zookeeperSpec == null) {
+                log.debug("creating controller using default spec for {}", 
this);
+                zookeeperSpec = 
BasicEntitySpec.newInstance(KafkaZookeeper.class);
+                setAttribute(ZOOKEEPER_SPEC, zookeeperSpec);
+            } else {
+                log.debug("creating controller using custom spec for {}", 
this);
+            }
+            zookeeper = 
getEntityManager().createEntity(WrappingEntitySpec.newInstance(zookeeperSpec).parent(this));
+            if (Entities.isManaged(this)) Entities.manage(zookeeper);
+            setAttribute(ZOOKEEPER, zookeeper);
+        }
+
+        log.debug("creating cluster child for {}", this);
+        ConfigurableEntityFactory<KafkaBroker> brokerFactory = 
getAttribute(BROKER_FACTORY);
+        EntitySpec<KafkaBroker> brokerSpec = getAttribute(BROKER_SPEC);
+        if (brokerFactory == null && brokerSpec == null) {
+            log.debug("creating default broker spec for {}", this);
+            brokerSpec = BasicEntitySpec.newInstance(KafkaBroker.class);
+            setAttribute(BROKER_SPEC, brokerSpec);
+        }
+        // Note relies on initial_size being inherited by DynamicCluster, 
because key id is identical
+        // We add the zookeeper configuration to the KafkaBroker specification 
or factory here
+        Map<String,Object> flags;
+        if (brokerSpec != null) {
+            flags = MutableMap.<String, Object>of("memberSpec", 
WrappingEntitySpec.newInstance(brokerSpec).configure(KafkaBroker.ZOOKEEPER, 
zookeeper));
+        } else {
+            brokerFactory.configure(KafkaBroker.ZOOKEEPER, zookeeper);
+            flags = MutableMap.<String, Object>of("factory", brokerFactory);
+        }
+        DynamicCluster cluster = 
getEntityManager().createEntity(BasicEntitySpec.newInstance(DynamicCluster.class)
+                .parent(this)
+                .configure(flags));
+        if (Entities.isManaged(this)) Entities.manage(cluster);
+        setAttribute(CLUSTER, cluster);
+    }
+
+    @Override
+    public KafkaZookeeper getZookeeper() {
+        return getAttribute(ZOOKEEPER);
+    }
+
+    @Override
+    public synchronized ConfigurableEntityFactory<KafkaBroker> 
getBrokerFactory() {
+        return (ConfigurableEntityFactory<KafkaBroker>) 
getAttribute(BROKER_FACTORY);
+    }
+
+    @Override
+    public synchronized DynamicCluster getCluster() {
+        return getAttribute(CLUSTER);
+    }
+
+    @Override
+    public void start(Collection<? extends Location> locations) {
+        if (isLegacyConstruction()) {
+            postConstruct();
+        }
+
+        if (locations.isEmpty()) locations = this.getLocations();
+        Iterables.getOnlyElement(locations); //assert just one
+        addLocations(locations);
+
+        List<Entity> childrenToStart = MutableList.<Entity>of(getCluster());
+        // Set the KafkaZookeeper entity as child of cluster, if it does not 
already have a parent
+        if (getZookeeper().getParent() == null) {
+            addChild(getZookeeper());
+        }
+        // And only start zookeeper if we are parent
+        if (this.equals(getZookeeper().getParent())) 
childrenToStart.add(getZookeeper());
+        try {
+            Entities.invokeEffectorList(this, childrenToStart, 
Startable.START, ImmutableMap.of("locations", locations)).get();
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        } catch (ExecutionException e) {
+            throw Exceptions.propagate(e);
+        }
+
+        connectSensors();
+    }
+
+    @Override
+    public void stop() {
+        if (this.equals(getZookeeper().getParent())) {
+            getZookeeper().stop();
+        }
+        getCluster().stop();
+
+        super.getLocations().clear();
+        setAttribute(SERVICE_UP, false);
+    }
+
+    @Override
+    public void restart() {
+        // TODO prod the entities themselves to restart, instead?
+        Collection<Location> locations = Lists.newArrayList(getLocations());
+
+        stop();
+        start(locations);
+    }
+
+    void connectSensors() {
+        
SensorPropagatingEnricher.newInstanceListeningToAllSensorsBut(getCluster(), 
SERVICE_UP)
+                .addToEntityAndEmitAll(this);
+        SensorPropagatingEnricher.newInstanceListeningTo(getZookeeper(), 
SERVICE_UP)
+                .addToEntityAndEmitAll(this);
+    }
+
+    @Override
+    public Integer resize(Integer desiredSize) {
+        return getCluster().resize(desiredSize);
+    }
+
+    /** @return the current size of the group. */
+    public Integer getCurrentSize() {
+        return getCluster().getCurrentSize();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java
new file mode 100644
index 0000000..5f50c6d
--- /dev/null
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import java.util.Map;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.AbstractEntity;
+import brooklyn.entity.messaging.Topic;
+import brooklyn.util.MutableMap;
+
+public class KafkaTopic extends AbstractEntity implements Topic {
+
+    public KafkaTopic() {
+        super(MutableMap.of(), null);
+    }
+    public KafkaTopic(Map properties) {
+        super(properties, null);
+    }
+    public KafkaTopic(Entity parent) {
+        super(MutableMap.of(), parent);
+    }
+    public KafkaTopic(Map properties, Entity parent) {
+        super(properties, parent);
+    }
+
+    // kafka:type=kafka.logs.${topicName}
+
+    @Override
+    public String getTopicName() {
+        return null; // TODO
+    }
+
+    @Override
+    public void create() {
+        // TODO
+    }
+
+    @Override
+    public void delete() {
+        // TODO
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
new file mode 100644
index 0000000..8e1b5da
--- /dev/null
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.java.UsesJmx;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.util.flags.SetFromFlag;
+
+/**
+ * An {@link brooklyn.entity.Entity} that represents a single Kafka zookeeper 
instance.
+ */
+@ImplementedBy(KafkaZookeeperImpl.class)
+public interface KafkaZookeeper extends SoftwareProcess, UsesJmx, Kafka {
+
+    @SetFromFlag("version")
+    BasicConfigKey<String> SUGGESTED_VERSION = Kafka.SUGGESTED_VERSION;
+
+    @SetFromFlag("zookeeperPort")
+    PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new 
PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+");
+
+    /** Location of the configuration file template to be copied to the 
server.*/
+    @SetFromFlag("zookeeperConfig")
+    BasicConfigKey<String> ZOOKEEPER_CONFIG_TEMPLATE = new 
BasicConfigKey<String>(
+            String.class, "kafka.config.zookeeper", "Zookeeper configuration 
template (in freemarker format)", 
"classpath://brooklyn/entity/messaging/kafka/zookeeper.properties");
+
+    BasicAttributeSensor<Long> OUTSTANDING_REQUESTS = new 
BasicAttributeSensor<Long>(Long.class, "kafka.zookeeper.outstandingRequests", 
"Outstanding request count");
+    BasicAttributeSensor<Long> PACKETS_RECEIVED = new 
BasicAttributeSensor<Long>(Long.class, "kafka.zookeeper.packets.received", 
"Total packets received");
+    BasicAttributeSensor<Long> PACKETS_SENT = new 
BasicAttributeSensor<Long>(Long.class, "kafka.zookeeper.packets.sent", "Total 
packets sent");
+
+    Integer getZookeeperPort();
+
+    String getHostname();
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java
new file mode 100644
index 0000000..ac1c8fe
--- /dev/null
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperDriver.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import brooklyn.entity.basic.SoftwareProcessDriver;
+
+public interface KafkaZookeeperDriver extends SoftwareProcessDriver {
+
+    Integer getZookeeperPort();
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
new file mode 100644
index 0000000..d941411
--- /dev/null
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperImpl.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.entity.messaging.qpid.QpidBroker;
+import brooklyn.event.feed.function.FunctionFeed;
+import brooklyn.event.feed.function.FunctionPollConfig;
+import brooklyn.event.feed.jmx.JmxAttributePollConfig;
+import brooklyn.event.feed.jmx.JmxFeed;
+import brooklyn.util.MutableMap;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Objects.ToStringHelper;
+import com.google.common.collect.Sets;
+
+/**
+ * An {@link brooklyn.entity.Entity} that represents a single Kafka zookeeper 
instance.
+ */
+public class KafkaZookeeperImpl extends SoftwareProcessImpl implements 
KafkaZookeeper {
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaZookeeperImpl.class);
+
+    public KafkaZookeeperImpl() {
+        super();
+    }
+    public KafkaZookeeperImpl(Map<?, ?> properties) {
+        this(properties, null);
+    }
+    public KafkaZookeeperImpl(Entity parent) {
+        this(MutableMap.of(), parent);
+    }
+    public KafkaZookeeperImpl(Map<?, ?> properties, Entity parent) {
+        super(properties, parent);
+    }
+
+    @Override
+    public Integer getZookeeperPort() { return getAttribute(ZOOKEEPER_PORT); }
+
+    @Override
+    public String getHostname() { return getAttribute(HOSTNAME); }
+
+    @Override
+    public Class getDriverInterface() {
+        return KafkaZookeeperDriver.class;
+    }
+
+    @Override
+    protected Collection<Integer> getRequiredOpenPorts() {
+        Set<Integer> ports = 
Sets.newLinkedHashSet(super.getRequiredOpenPorts());
+        ports.add(getAttribute(ZOOKEEPER_PORT));
+        log.debug("getRequiredOpenPorts detected expanded ports {} for {}", 
ports, this);
+        return ports;
+    }
+
+    private volatile FunctionFeed functionFeed;
+    private volatile JmxFeed jmxFeed;
+
+    @Override
+    protected void connectSensors() {
+        String zookeeperMbean = 
"org.apache.ZooKeeperService:name0=StandaloneServer_port-1";
+
+        functionFeed = FunctionFeed.builder()
+                .entity(this)
+                .poll(new FunctionPollConfig<Object, Boolean>(SERVICE_UP)
+                        .period(500, TimeUnit.MILLISECONDS)
+                        .callable(new Callable<Boolean>() {
+                            public Boolean call() throws Exception {
+                                return getDriver().isRunning();
+                            }
+                        })
+                        .onError(Functions.constant(Boolean.FALSE)))
+                .build();
+
+        jmxFeed = JmxFeed.builder()
+                .entity(this)
+                .period(500, TimeUnit.MILLISECONDS)
+                .pollAttribute(new 
JmxAttributePollConfig<Long>(OUTSTANDING_REQUESTS)
+                        .objectName(zookeeperMbean)
+                        .attributeName("OutstandingRequests")
+                        .onError(Functions.constant(-1l)))
+                .pollAttribute(new 
JmxAttributePollConfig<Long>(PACKETS_RECEIVED)
+                        .objectName(zookeeperMbean)
+                        .attributeName("PacketsReceived")
+                        .onError(Functions.constant(-1l)))
+                .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_SENT)
+                        .objectName(zookeeperMbean)
+                        .attributeName("PacketsSent")
+                        .onError(Functions.constant(-1l)))
+                .build();
+    }
+
+    @Override
+    public void disconnectSensors() {
+        super.disconnectSensors();
+        if (functionFeed != null) functionFeed.stop();
+        if (jmxFeed != null) jmxFeed.stop();
+    }
+
+    @Override
+    protected ToStringHelper toStringHelper() {
+        return super.toStringHelper().add("zookeeperPort", getZookeeperPort());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
new file mode 100644
index 0000000..4310489
--- /dev/null
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeperSshDriver.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import static java.lang.String.format;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.lifecycle.CommonCommands;
+import brooklyn.entity.drivers.downloads.DownloadResolver;
+import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.MutableMap;
+import brooklyn.util.NetworkUtils;
+
+import com.google.common.collect.ImmutableMap;
+
+public class KafkaZookeeperSshDriver extends JavaSoftwareProcessSshDriver 
implements KafkaZookeeperDriver {
+
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaZookeeperSshDriver.class);
+
+    private String expandedInstallDir;
+
+    public KafkaZookeeperSshDriver(KafkaZookeeperImpl entity, 
SshMachineLocation machine) {
+        super(entity, machine);
+    }
+
+    @Override
+    protected String getLogFileLocation() { return getRunDir()+"/kafka-log"; }
+
+    @Override
+    public Integer getZookeeperPort() { return 
entity.getAttribute(KafkaZookeeper.ZOOKEEPER_PORT); }
+
+    private String getExpandedInstallDir() {
+        if (expandedInstallDir == null) throw new 
IllegalStateException("expandedInstallDir is null; most likely install was not 
called");
+        return expandedInstallDir;
+    }
+
+    @Override
+    public void install() {
+        DownloadResolver resolver = 
entity.getManagementContext().getEntityDownloadsManager().newDownloader(this);
+        List<String> urls = resolver.getTargets();
+        String saveAs = resolver.getFilename();
+        expandedInstallDir = 
getInstallDir()+"/"+resolver.getUnpackedDirectoryName(format("kafka-%s-src", 
getVersion()));
+
+        List<String> commands = new LinkedList<String>();
+        commands.addAll(CommonCommands.downloadUrlAs(urls, saveAs));
+        commands.add(CommonCommands.INSTALL_TAR);
+        commands.add("tar xzfv "+saveAs);
+        commands.add("cd "+expandedInstallDir);
+        commands.add("./sbt update");
+        commands.add("./sbt package");
+
+        newScript(INSTALLING)
+                .failOnNonZeroResultCode()
+                .body.append(commands)
+                .execute();
+    }
+
+    @Override
+    public void customize() {
+        NetworkUtils.checkPortsValid(MutableMap.of("zookeeperPort", 
getZookeeperPort()));
+        newScript(CUSTOMIZING)
+                .failOnNonZeroResultCode()
+                .body.append(format("cp -R %s/* %s", getExpandedInstallDir(), 
getRunDir()))
+                .execute();
+
+        String serverConfig = 
entity.getConfig(KafkaZookeeper.ZOOKEEPER_CONFIG_TEMPLATE);
+        copyTemplate(serverConfig, "zookeeper.properties");
+    }
+
+    @Override
+    public void launch() {
+        newScript(ImmutableMap.of("usePidFile", getPidFile()), LAUNCHING)
+                .failOnNonZeroResultCode()
+                .body.append("nohup ./bin/zookeeper-server-start.sh 
./zookeeper.properties > console.out 2>&1 &")
+                .execute();
+    }
+
+    public String getPidFile() { return getRunDir() + "/kafka.pid"; }
+
+    @Override
+    public boolean isRunning() {
+        return newScript(ImmutableMap.of("usePidFile", getPidFile()), 
CHECK_RUNNING).execute() == 0;
+    }
+
+    @Override
+    public void stop() {
+        newScript(ImmutableMap.of("usePidFile", false), STOPPING)
+                .body.append("ps ax | grep quorum\\.QuorumPeerMain | awk 
'{print $1}' | xargs kill")
+                .body.append("ps ax | grep quorum\\.QuorumPeerMain | awk 
'{print $1}' | xargs kill -9")
+                .execute();
+    }
+
+    @Override
+    public Map<String, String> getShellEnvironment() {
+        Map<String, String> orig = super.getShellEnvironment();
+        return MutableMap.<String, String>builder()
+                .put("KAFKA_JMX_OPTS", orig.get("JAVA_OPTS"))
+                .build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
 
b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
new file mode 100644
index 0000000..b440076
--- /dev/null
+++ 
b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
@@ -0,0 +1,120 @@
+[#ftl]
+# Copyright 2013 by Cloudsoft Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+##
+# KafkaBroker configuration template for Brooklyn
+#
+# see kafka.server.KafkaConfig for additional details and defaults
+##
+
+############################# Server Basics #############################
+# The id of the broker. This must be set to a unique integer for each broker.
+brokerid=${entity.brokerId?c}
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use 
the value returned
+# from InetAddress.getLocalHost().  If there are multiple interfaces 
getLocalHost
+# may not be what you want.
+hostname=${driver.hostname}
+
+
+############################# Socket Server Settings 
#############################
+
+# The port the socket server listens on
+port=${entity.kafkaPort?c}
+
+# The number of processor threads the socket server uses for receiving and 
answering requests. 
+# Defaults to the number of cores on the machine
+num.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer=1048576
+
+# The maximum size of a request that the socket server will accept (protection 
against OOM)
+max.socket.request.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=${driver.runDir}/kafka-logs
+
+# The number of logical partitions per topic per server. More partitions allow 
greater parallelism
+# for consumption, but also mean more files.
+num.partitions=1
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the 
most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data is at greater risk of loss in the event of 
a crash.
+#    2. Latency: Data is not made available to consumers until it is flushed 
(which adds latency).
+#    3. Throughput: The flush is generally the most expensive operation. 
+# The settings below allow one to configure the flush policy to flush data 
after a period of time or
+# every N messages (or both). This can be done globally and overridden on a 
per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.default.flush.interval.ms=1000
+
+# Per-topic overrides for log.default.flush.interval.ms
+#topic.flush.intervals.ms=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be 
flushed to disk.
+log.default.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy 
#############################
+
+# The following configurations control the disposal of log segments. The 
policy can
+# be set to delete segments after a period of time, or after a given size has 
accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. 
Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as 
long as the remaining
+# segments don't drop below log.retention.size.
+#log.retention.size=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log 
segment will be created.
+log.file.size=536870912
+
+# The interval at which log segments are checked to see if they can be deleted 
according 
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zk.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c}
+
+# Timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties
 
b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties
new file mode 100644
index 0000000..4827480
--- /dev/null
+++ 
b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties
@@ -0,0 +1,25 @@
+[#ftl]
+# Copyright 2013 by Cloudsoft Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+##
+# KafkaZookeeper configuration template for Brooklyn
+##
+
+# the directory where the snapshot is stored.
+dataDir=${driver.runDir}/zookeeper
+# the port at which the clients will connect
+clientPort=${entity.zookeeperPort?c}
+# disable the per-ip limit on the number of connections since this is a 
non-production config
+maxClientCnxns=0

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy
 
b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy
index f50de0c..28ff308 100644
--- 
a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy
+++ 
b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.groovy
@@ -49,7 +49,11 @@ public class ActiveMQIntegrationTest {
 
     @AfterMethod(groups = "Integration")
     public void shutdown() {
-        if (app != null) Entities.destroyAll(app);
+        try {
+            if (app != null) Entities.destroyAll(app);
+        } catch (Exception e) {
+            log.warn("Error stopping entities", e);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
 
b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
new file mode 100644
index 0000000..0943dcd
--- /dev/null
+++ 
b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import static brooklyn.test.TestUtils.*
+import static java.util.concurrent.TimeUnit.*
+import static org.testng.Assert.*
+
+import java.util.concurrent.TimeUnit
+
+import javax.jms.Connection
+import javax.jms.MessageConsumer
+import javax.jms.MessageProducer
+import javax.jms.Queue
+import javax.jms.Session
+import javax.jms.TextMessage
+
+import org.apache.activemq.ActiveMQConnectionFactory
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+import org.testng.annotations.AfterMethod
+import org.testng.annotations.BeforeMethod
+import org.testng.annotations.Test
+
+import brooklyn.entity.basic.ApplicationBuilder
+import brooklyn.entity.basic.Entities
+import brooklyn.entity.proxying.BasicEntitySpec
+import brooklyn.entity.trait.Startable
+import brooklyn.location.Location
+import brooklyn.location.basic.LocalhostMachineProvisioningLocation
+import brooklyn.test.entity.TestApplication
+import brooklyn.util.internal.TimeExtras
+
+/**
+ * Test the operation of the {@link ActiveMQBroker} class.
+ */
+public class KafkaIntegrationTest {
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaIntegrationTest.class)
+
+    static { TimeExtras.init() }
+
+    private TestApplication app
+    private Location testLocation
+
+    @BeforeMethod(groups = "Integration")
+    public void setup() {
+        app = ApplicationBuilder.builder(TestApplication.class).manage();
+        testLocation = new LocalhostMachineProvisioningLocation()
+    }
+
+    @AfterMethod(groups = "Integration")
+    public void shutdown() {
+        if (app != null) Entities.destroyAll(app);
+    }
+
+    /**
+     * Test that we can start a zookeeper.
+     */
+    @Test(groups = "Integration")
+    public void testZookeeper() {
+        KafkaZookeeper zookeeper = 
app.createAndManageChild(BasicEntitySpec.newInstance(KafkaZookeeper.class));
+
+        zookeeper.start([ testLocation ])
+        executeUntilSucceedsWithShutdown(zookeeper, 
timeout:600*TimeUnit.SECONDS) {
+            assertTrue zookeeper.getAttribute(Startable.SERVICE_UP)
+        }
+        assertFalse zookeeper.getAttribute(Startable.SERVICE_UP)
+    }
+
+    /**
+     * Test that we can start a  broker and zookeeper together.
+     */
+    @Test(groups = "Integration")
+    public void testBrokerPlusZookeeper() {
+        KafkaZookeeper zookeeper = 
app.createAndManageChild(BasicEntitySpec.newInstance(KafkaZookeeper.class));
+        KafkaBroker broker = 
app.createAndManageChild(BasicEntitySpec.newInstance(KafkaBroker.class).configure(KafkaBroker.ZOOKEEPER,
 zookeeper));
+
+        zookeeper.start([ testLocation ])
+        executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) {
+            assertTrue zookeeper.getAttribute(Startable.SERVICE_UP)
+        }
+    
+        broker.start([ testLocation ])
+        executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) {
+            assertTrue broker.getAttribute(Startable.SERVICE_UP)
+        }
+    }
+
+    /**
+     * Test that we can start a cluster with zookeeper and one broker.
+     */
+    @Test(groups = "Integration")
+    public void testSingleBrokerCluster() {
+        KafkaCluster cluster = 
app.createAndManageChild(BasicEntitySpec.newInstance(KafkaCluster.class).configure(KafkaCluster.INITIAL_SIZE,
 1));
+
+        cluster.start([ testLocation ])
+        executeUntilSucceedsWithShutdown(cluster, 
timeout:600*TimeUnit.SECONDS) {
+            assertTrue cluster.getAttribute(Startable.SERVICE_UP)
+            Entities.dumpInfo(cluster)
+        }
+        assertFalse cluster.getAttribute(Startable.SERVICE_UP)
+    }
+
+    // TODO test with API sending messages
+    // TODO test that sensors update
+    // TODO add demo application
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/37e890c2/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java
 
b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java
new file mode 100644
index 0000000..6229b4e
--- /dev/null
+++ 
b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaLiveTest.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import brooklyn.entity.AbstractEc2LiveTest;
+import brooklyn.location.Location;
+
+public class KafkaLiveTest extends AbstractEc2LiveTest {
+
+    /**
+     * Test Kafka cluster operation.
+     */
+    @Override
+    protected void doTest(Location loc) throws Exception {
+        throw new UnsupportedOperationException();
+    }
+
+}

Reply via email to