Repository: incubator-brooklyn Updated Branches: refs/heads/master a1efa470d -> be9ca632c
Kafka - fix escaping Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/1e8899f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/1e8899f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/1e8899f0 Branch: refs/heads/master Commit: 1e8899f0a6427da6e11a180b69aa620ed52d2dfa Parents: efaceed Author: Valentin Aitken <[email protected]> Authored: Fri Jul 17 16:26:28 2015 +0100 Committer: Valentin Aitken <[email protected]> Committed: Thu Jul 23 11:23:24 2015 +0300 ---------------------------------------------------------------------- .../messaging/kafka/KafkaZooKeeperSshDriver.java | 8 ++++++-- .../messaging/kafka/KafkaIntegrationTest.java | 13 +++++++++---- .../entity/messaging/kafka/KafkaSupport.java | 16 +++++----------- 3 files changed, 20 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/1e8899f0/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java index fd57d95..d5b47e3 100644 --- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java +++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java @@ -22,10 +22,11 @@ import java.util.Map; import brooklyn.config.ConfigKey; import brooklyn.entity.basic.Attributes; -import brooklyn.entity.zookeeper.ZooKeeperNode; import brooklyn.location.basic.SshMachineLocation; import brooklyn.util.collections.MutableMap; +import static brooklyn.util.text.StringEscapes.BashStringEscapes.escapeLiteralForDoubleQuotedBash; + public class KafkaZooKeeperSshDriver extends AbstractfKafkaSshDriver implements KafkaZooKeeperDriver { public KafkaZooKeeperSshDriver(KafkaZooKeeperImpl entity, SshMachineLocation machine) { @@ -72,7 +73,10 @@ public class KafkaZooKeeperSshDriver extends AbstractfKafkaSshDriver implements String zookeeperUrl = getEntity().getAttribute(Attributes.HOSTNAME) + ":" + getZookeeperPort(); newScript(CUSTOMIZING) .failOnNonZeroResultCode() - .body.append(String.format("./bin/%s --create --zookeeper %s --replication-factor 1 --partitions 1 --topic %s", getTopicsScriptName(), zookeeperUrl, topic)) + .body.append(String.format("./bin/%s --create --zookeeper \"%s\" --replication-factor 1 --partitions 1 --topic \"%s\"", + getTopicsScriptName(), + escapeLiteralForDoubleQuotedBash(zookeeperUrl), + escapeLiteralForDoubleQuotedBash(topic))) .execute(); } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/1e8899f0/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java index 9f490d9..c6487e5 100644 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java +++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java @@ -125,11 +125,16 @@ public class KafkaIntegrationTest { Entities.dumpInfo(cluster); - KafkaSupport support = new KafkaSupport(cluster); + final KafkaSupport support = new KafkaSupport(cluster); support.sendMessage("brooklyn", "TEST_MESSAGE"); - Thread.sleep(Duration.seconds(5).toMilliseconds()); - String message = support.getMessage("brooklyn"); - assertEquals(message, "TEST_MESSAGE"); + + Asserts.succeedsEventually(MutableMap.of("timeout", Duration.FIVE_SECONDS), new Runnable() { + @Override + public void run() { + String message = support.getMessage("brooklyn"); + assertEquals(message, "TEST_MESSAGE"); + } + }); } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/1e8899f0/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java ---------------------------------------------------------------------- diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java index c80befa..e035784 100644 --- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java +++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java @@ -22,7 +22,6 @@ import brooklyn.entity.Entity; import brooklyn.entity.basic.EntityPredicates; import brooklyn.entity.zookeeper.ZooKeeperNode; -import brooklyn.util.time.Duration; import com.google.common.base.Optional; import com.google.common.base.Predicates; import com.google.common.collect.Iterables; @@ -66,16 +65,11 @@ public class KafkaSupport { props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); - try { - ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic); - Thread.sleep(Duration.seconds(1).toMilliseconds()); - - ProducerRecord<String, String> data = new ProducerRecord<>(topic, message); - producer.send(data); - producer.close(); - } catch (InterruptedException e) { - e.printStackTrace(); - } + ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic); + + ProducerRecord<String, String> data = new ProducerRecord<>(topic, message); + producer.send(data); + producer.close(); } else { throw new InvalidParameterException("No kafka broker node found"); }
