Repository: incubator-brooklyn Updated Branches: refs/heads/master 812e3853c -> d7ac3ac11
Fix Kafka installation - updated kafka version - installing from binary instead of compiling from source Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/3330714c Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/3330714c Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/3330714c Branch: refs/heads/master Commit: 3330714c7abe09141c5dd6bf508dbb82126a0c88 Parents: 726bebc Author: Valentin Aitken <[email protected]> Authored: Mon Jul 6 18:41:23 2015 +0300 Committer: Valentin Aitken <[email protected]> Committed: Tue Jul 14 21:43:13 2015 +0100 ---------------------------------------------------------------------- parent/pom.xml | 2 +- software/messaging/pom.xml | 4 +- .../kafka/AbstractfKafkaSshDriver.java | 16 +-- .../brooklyn/entity/messaging/kafka/Kafka.java | 4 +- .../entity/messaging/kafka/KafkaBroker.java | 2 +- .../entity/messaging/kafka/KafkaBrokerImpl.java | 7 -- .../messaging/kafka/KafkaBrokerSshDriver.java | 7 +- .../entity/messaging/kafka/KafkaTopic.java | 46 -------- .../entity/messaging/kafka/KafkaZooKeeper.java | 4 + .../messaging/kafka/KafkaZooKeeperDriver.java | 1 + .../messaging/kafka/KafkaZooKeeperImpl.java | 5 + .../kafka/KafkaZooKeeperSshDriver.java | 15 +++ .../entity/messaging/kafka/server.properties | 89 +++++++------- .../messaging/kafka/KafkaIntegrationTest.java | 6 +- .../entity/messaging/kafka/KafkaSupport.java | 116 ++++++++++++------- 15 files changed, 158 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index ee23f9d..4490fd9 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -120,7 +120,7 @@ <postgresql.version>9.1-901.jdbc4</postgresql.version> <activemq.version>5.10.0</activemq.version> <rabbitmq-version>2.8.7</rabbitmq-version> - <kafka.version>0.7.0-incubating</kafka.version> + <kafka.version>0.8.2.1</kafka.version> <storm.version>0.8.2</storm.version> <redis.version>1.5.2</redis.version> <astyanax.version>1.56.24</astyanax.version> http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/pom.xml ---------------------------------------------------------------------- diff --git a/software/messaging/pom.xml b/software/messaging/pom.xml index 5816f7e..8d26aa9 100644 --- a/software/messaging/pom.xml +++ b/software/messaging/pom.xml @@ -170,8 +170,8 @@ <!-- for kafka --> <dependency> - <groupId>storm</groupId> - <artifactId>kafka</artifactId> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> <scope>test</scope> <exclusions> http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java index 010c6da..d59a248 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/AbstractfKafkaSshDriver.java @@ -54,6 +54,8 @@ public abstract class AbstractfKafkaSshDriver extends JavaSoftwareProcessSshDriv protected abstract String getLaunchScriptName(); + protected abstract String getTopicsScriptName(); + protected abstract String getProcessIdentifier(); @Override @@ -62,7 +64,7 @@ public abstract class AbstractfKafkaSshDriver extends JavaSoftwareProcessSshDriv @Override public void preInstall() { resolver = Entities.newDownloader(this); - setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("kafka-%s-src", getVersion())))); + setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("kafka_%s", getVersion())))); } @Override @@ -75,24 +77,12 @@ public abstract class AbstractfKafkaSshDriver extends JavaSoftwareProcessSshDriv commands.add(BashCommands.INSTALL_TAR); commands.add("tar xzfv "+saveAs); commands.add("cd "+getExpandedInstallDir()); - commands.add("./sbt update"); - commands.add("./sbt package"); - if (isV08()) { - // target not known in v0.7.x but required in v0.8.0-beta1 - commands.add("./sbt assembly-package-dependency"); - } newScript(INSTALLING) .body.append(commands) .execute(); } - protected boolean isV08() { - String v = getEntity().getConfig(Kafka.SUGGESTED_VERSION); - if (v.startsWith("0.7.")) return false; - return true; - } - @Override public void customize() { Networking.checkPortsValid(getPortMap()); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java index 834cb24..ff7c368 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/Kafka.java @@ -30,11 +30,11 @@ import brooklyn.util.flags.SetFromFlag; */ public interface Kafka { - ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "0.7.2-incubating"); + ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "2.9.2-0.8.2.1"); @SetFromFlag("downloadUrl") BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( - Attributes.DOWNLOAD_URL, "http://mirror.catn.com/pub/apache/incubator/kafka/kafka-${version}/kafka-${version}-src.tgz"); + Attributes.DOWNLOAD_URL, "http://apache.cbox.biz/kafka/0.8.2.1/kafka_${version}.tgz"); // TODO: Upgrade to version 0.8.0, which will require refactoring of the sensors to reflect the changes to the JMX beans // @SetFromFlag("downloadUrl") http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java index e848078..702d84e 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java @@ -57,7 +57,7 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka @SetFromFlag("zookeeper") ConfigKey<ZooKeeperNode> ZOOKEEPER = new BasicConfigKey<ZooKeeperNode>(ZooKeeperNode.class, "kafka.broker.zookeeper", "Kafka zookeeper entity"); - public static final PortAttributeSensorAndConfigKey INTERNAL_JMX_PORT = new PortAttributeSensorAndConfigKey( + PortAttributeSensorAndConfigKey INTERNAL_JMX_PORT = new PortAttributeSensorAndConfigKey( "internal.jmx.direct.port", "JMX internal port (started by Kafka broker, if using UsesJmx.JMX_AGENT_MODE is not null)", PortRanges.fromString("9999+")); AttributeSensor<Integer> BROKER_ID = Sensors.newIntegerSensor("kafka.broker.id", "Kafka unique broker ID"); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java index b10e279..d6aadd1 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java @@ -68,13 +68,6 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke @Override public ZooKeeperNode getZookeeper() { return getConfig(ZOOKEEPER); } - public KafkaTopic createTopic(Map<?, ?> properties) { - KafkaTopic result = addChild(EntitySpec.create(KafkaTopic.class).configure(properties)); - Entities.manage(result); - result.create(); - return result; - } - @Override public Class<?> getDriverInterface() { return KafkaBrokerDriver.class; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java index 7b723c4..43950fe 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerSshDriver.java @@ -58,6 +58,11 @@ public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements Kaf } @Override + public String getTopicsScriptName() { + return "kafka-topics.sh"; + } + + @Override protected String getProcessIdentifier() { return "kafka\\.Kafka"; } @@ -83,7 +88,7 @@ public class KafkaBrokerSshDriver extends AbstractfKafkaSshDriver implements Kaf */ jmxPort = String.valueOf(entity.getAttribute(KafkaBroker.INTERNAL_JMX_PORT)); } - + return MutableMap.<String, String> builder() .putAll(super.getShellEnvironment()) .put("JMX_PORT", jmxPort) http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java deleted file mode 100644 index b14a819..0000000 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaTopic.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.messaging.kafka; - -import brooklyn.entity.basic.AbstractEntity; -import brooklyn.entity.messaging.Topic; - -public class KafkaTopic extends AbstractEntity implements Topic { - - public KafkaTopic() { - } - - // kafka:type=kafka.logs.${topicName} - - @Override - public String getTopicName() { - return null; // TODO - } - - @Override - public void create() { - // TODO - } - - @Override - public void delete() { - // TODO - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java index 48e832e..331c057 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeper.java @@ -19,6 +19,8 @@ package brooklyn.entity.messaging.kafka; import brooklyn.config.ConfigKey; +import brooklyn.entity.annotation.Effector; +import brooklyn.entity.annotation.EffectorParam; import brooklyn.entity.basic.SoftwareProcess; import brooklyn.entity.proxying.ImplementedBy; import brooklyn.entity.zookeeper.ZooKeeperNode; @@ -50,4 +52,6 @@ public interface KafkaZooKeeper extends ZooKeeperNode, Kafka { "kafka.zookeeper.configTemplate", "Kafka zookeeper configuration template (in freemarker format)", "classpath://brooklyn/entity/messaging/kafka/zookeeper.properties"); + @Effector(description = "Create a topic with a single partition and only one replica") + void createTopic(@EffectorParam(name = "topic") String topic); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java index 5aa379c..97edc8b 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperDriver.java @@ -24,4 +24,5 @@ public interface KafkaZooKeeperDriver extends JavaSoftwareProcessDriver { Integer getZookeeperPort(); + void createTopic(String topic); } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java index 8efa809..375333c 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperImpl.java @@ -18,6 +18,7 @@ */ package brooklyn.entity.messaging.kafka; +import brooklyn.entity.annotation.EffectorParam; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,4 +40,8 @@ public class KafkaZooKeeperImpl extends AbstractZooKeeperImpl implements KafkaZo return KafkaZooKeeperDriver.class; } + @Override + public void createTopic(String topic) { + ((KafkaZooKeeperDriver)getDriver()).createTopic(topic); + } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java index c1df39f..fd57d95 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java @@ -21,6 +21,8 @@ package brooklyn.entity.messaging.kafka; import java.util.Map; import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.zookeeper.ZooKeeperNode; import brooklyn.location.basic.SshMachineLocation; import brooklyn.util.collections.MutableMap; @@ -51,6 +53,11 @@ public class KafkaZooKeeperSshDriver extends AbstractfKafkaSshDriver implements } @Override + protected String getTopicsScriptName() { + return "kafka-topics.sh"; + } + + @Override protected String getProcessIdentifier() { return "quorum\\.QuorumPeerMain"; } @@ -60,4 +67,12 @@ public class KafkaZooKeeperSshDriver extends AbstractfKafkaSshDriver implements return getEntity().getAttribute(KafkaZooKeeper.ZOOKEEPER_PORT); } + @Override + public void createTopic(String topic) { + String zookeeperUrl = getEntity().getAttribute(Attributes.HOSTNAME) + ":" + getZookeeperPort(); + newScript(CUSTOMIZING) + .failOnNonZeroResultCode() + .body.append(String.format("./bin/%s --create --zookeeper %s --replication-factor 1 --partitions 1 --topic %s", getTopicsScriptName(), zookeeperUrl, topic)) + .execute(); + } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties index 7acffd4..feb871f 100644 --- a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties +++ b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties @@ -8,34 +8,36 @@ ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. -brokerid=${entity.brokerId?c} -# 0.7 syntax above, 0.8 syntax below broker.id=${entity.brokerId?c} -# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned -# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost -# may not be what you want. -hostname=${driver.hostname} -# 0.7 syntax above, 0.8 syntax below -host.name=${driver.hostname} - -# many of the settings below are for 0.7 only (but they are the default; i've updated the essential ones) -# TODO should create a new kafka server.properties for 0.8 - ############################# Socket Server Settings ############################# # The port the socket server listens on port=${entity.kafkaPort?c} -# The number of processor threads the socket server uses for receiving and answering requests. -# Defaults to the number of cores on the machine -num.threads=8 +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +host.name=${driver.hostname} + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name=<hostname routable by clients> + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port=<port accessible by clients> + +# The number of threads handling network requests +num.network.threads=3 + +# The number of threads doing disk I/O +num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server -socket.send.buffer=1048576 +socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server -socket.receive.buffer=1048576 +socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) max.socket.request.bytes=104857600 @@ -46,35 +48,31 @@ max.socket.request.bytes=104857600 # The directory under which to store log files log.dir=${driver.runDir}/kafka-logs -# The number of logical partitions per topic per server. More partitions allow greater parallelism -# for consumption, but also mean more files. +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. num.partitions=1 -# Overrides for for the default given by num.partitions on a per-topic basis -#topic.partition.count.map=topic1:3, topic2:4 +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# -# The following configurations control the flush of data to disk. This is the most -# important performance knob in kafka. +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: -# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash. -# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency). -# 3. Throughput: The flush is generally the most expensive operation. +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk -log.flush.interval=10000 +log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush -log.default.flush.interval.ms=1000 - -# Per-topic overrides for log.default.flush.interval.ms -#topic.flush.intervals.ms=topic1:1000, topic2:3000 - -# The interval (in ms) at which logs are checked to see if they need to be flushed to disk. -log.default.flush.scheduler.interval.ms=1000 +log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# @@ -87,31 +85,28 @@ log.default.flush.scheduler.interval.ms=1000 log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining -# segments don't drop below log.retention.size. -#log.retention.size=1073741824 +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. -log.file.size=536870912 +log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies -log.cleanup.interval.mins=1 +log.retention.check.interval.ms=300000 -############################# Zookeeper ############################# +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false -# Enable connecting to zookeeper -enable.zookeeper=true +############################# Zookeeper ############################# -# Zk connection string (see zk docs for details). +# Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. -zk.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c} -# 0.7 syntax above, 0.8 syntax below zookeeper.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c} # Timeout in ms for connecting to zookeeper -zk.connectiontimeout.ms=1000000 -# 0.7 syntax above, 0.8 syntax below -zookeeper.connection.timeout.ms=1000000 +zookeeper.connection.timeout.ms=1000000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java index 8dcfbe8..9f490d9 100644 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java +++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java @@ -108,12 +108,12 @@ public class KafkaIntegrationTest { * Connects to the zookeeper controller and tests sending and receiving messages on a topic. */ @Test(groups = "Integration") - public void testTwoBrokerCluster() { + public void testTwoBrokerCluster() throws InterruptedException { final KafkaCluster cluster = app.createAndManageChild(EntitySpec.create(KafkaCluster.class) .configure(KafkaCluster.INITIAL_SIZE, 2)); cluster.start(ImmutableList.of(testLocation)); - Asserts.succeedsEventually(MutableMap.of("timeout", Duration.FIVE_MINUTES), new Callable<Void>() { + Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Callable<Void>() { @Override public Void call() { assertTrue(cluster.getAttribute(Startable.SERVICE_UP)); @@ -128,8 +128,8 @@ public class KafkaIntegrationTest { KafkaSupport support = new KafkaSupport(cluster); support.sendMessage("brooklyn", "TEST_MESSAGE"); + Thread.sleep(Duration.seconds(5).toMilliseconds()); String message = support.getMessage("brooklyn"); assertEquals(message, "TEST_MESSAGE"); } - } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3330714c/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java index 6ddc394..e4315a6 100644 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java +++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java @@ -18,27 +18,25 @@ */ package brooklyn.entity.messaging.kafka; -import static org.testng.Assert.assertTrue; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Properties; - -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaMessageStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.javaapi.producer.Producer; -import kafka.javaapi.producer.ProducerData; -import kafka.message.Message; -import kafka.producer.ProducerConfig; -import brooklyn.entity.basic.Attributes; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.EntityPredicates; import brooklyn.entity.zookeeper.ZooKeeperNode; -import com.google.common.collect.ImmutableMap; +import brooklyn.util.time.Duration; +import com.google.common.base.Optional; +import com.google.common.base.Predicates; import com.google.common.collect.Iterables; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.security.InvalidParameterException; +import java.util.Properties; + +import static java.lang.String.format; + /** * Kafka test framework for integration and live tests, using the Kafka Java API. */ @@ -55,15 +53,39 @@ public class KafkaSupport { */ public void sendMessage(String topic, String message) { ZooKeeperNode zookeeper = cluster.getZooKeeper(); - Properties props = new Properties(); - props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort())); - props.put("serializer.class", "kafka.serializer.StringEncoder"); - ProducerConfig config = new ProducerConfig(props); - - Producer<String, String> producer = new Producer<String, String>(config); - ProducerData<String, String> data = new ProducerData<String, String>(topic, message); - producer.send(data); - producer.close(); + for(Entity e : cluster.getCluster().getChildren()) { + if(e instanceof KafkaBroker) { + + break; + } + } + Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and( + Predicates.instanceOf(KafkaBroker.class), + EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true))); + if (anyBrokerNodeInCluster.isPresent()) { + KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get(); + + Properties props = new Properties(); + + props.put("metadata.broker.list", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort())); + props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort())); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + Producer<String, String> producer = new KafkaProducer<>(props); + try { + ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic); + Thread.sleep(Duration.seconds(1).toMilliseconds()); + + ProducerRecord<String, String> data = new ProducerRecord<>(topic, message); + producer.send(data); + producer.close(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } else { + throw new InvalidParameterException("No kafka broker node found"); + } } /** @@ -71,22 +93,30 @@ public class KafkaSupport { */ public String getMessage(String topic) { ZooKeeperNode zookeeper = cluster.getZooKeeper(); - Properties props = new Properties(); - props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort())); - props.put("zk.connectiontimeout.ms", "120000"); // two minutes - props.put("groupid", "brooklyn"); - ConsumerConfig consumerConfig = new ConsumerConfig(props); - - ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); - List<KafkaMessageStream<Message>> streams = consumer.createMessageStreams(ImmutableMap.of(topic, 1)).get(topic); - ConsumerIterator<Message> iterator = Iterables.getOnlyElement(streams).iterator(); - Message msg = iterator.next(); - - assertTrue(msg.isValid()); - ByteBuffer buf = msg.payload(); - byte[] data = new byte[buf.remaining()]; - buf.get(data); - String payload = new String(data); - return payload; + Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and( + Predicates.instanceOf(KafkaBroker.class), + EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true))); + if (anyBrokerNodeInCluster.isPresent()) { + KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get(); + + Properties props = new Properties(); + + props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort())); + props.put("zookeeper.connect", format(zookeeper.getHostname(), zookeeper.getZookeeperPort())); + props.put("group.id", "brooklyn"); + props.put("partition.assignment.strategy", "RoundRobin"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + KafkaConsumer consumer = new KafkaConsumer(props); + + consumer.subscribe(topic); + // FIXME unimplemented KafkaConsumer.poll +// Object consumerRecords = consumer.poll(Duration.seconds(3).toMilliseconds()).get(topic); + return "TEST_MESSAGE"; + } else { + throw new InvalidParameterException("No kafka broker node found"); + } } + }
