Repository: incubator-brooklyn
Updated Branches:
  refs/heads/master a1efa470d -> be9ca632c


Kafka - fix escaping


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/1e8899f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/1e8899f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/1e8899f0

Branch: refs/heads/master
Commit: 1e8899f0a6427da6e11a180b69aa620ed52d2dfa
Parents: efaceed
Author: Valentin Aitken <[email protected]>
Authored: Fri Jul 17 16:26:28 2015 +0100
Committer: Valentin Aitken <[email protected]>
Committed: Thu Jul 23 11:23:24 2015 +0300

----------------------------------------------------------------------
 .../messaging/kafka/KafkaZooKeeperSshDriver.java    |  8 ++++++--
 .../messaging/kafka/KafkaIntegrationTest.java       | 13 +++++++++----
 .../entity/messaging/kafka/KafkaSupport.java        | 16 +++++-----------
 3 files changed, 20 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/1e8899f0/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
index fd57d95..d5b47e3 100644
--- 
a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
+++ 
b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZooKeeperSshDriver.java
@@ -22,10 +22,11 @@ import java.util.Map;
 
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.zookeeper.ZooKeeperNode;
 import brooklyn.location.basic.SshMachineLocation;
 import brooklyn.util.collections.MutableMap;
 
+import static 
brooklyn.util.text.StringEscapes.BashStringEscapes.escapeLiteralForDoubleQuotedBash;
+
 public class KafkaZooKeeperSshDriver extends AbstractfKafkaSshDriver 
implements KafkaZooKeeperDriver {
 
     public KafkaZooKeeperSshDriver(KafkaZooKeeperImpl entity, 
SshMachineLocation machine) {
@@ -72,7 +73,10 @@ public class KafkaZooKeeperSshDriver extends 
AbstractfKafkaSshDriver implements
         String zookeeperUrl = getEntity().getAttribute(Attributes.HOSTNAME) + 
":" + getZookeeperPort();
         newScript(CUSTOMIZING)
                 .failOnNonZeroResultCode()
-                .body.append(String.format("./bin/%s  --create --zookeeper %s 
--replication-factor 1 --partitions 1 --topic %s", getTopicsScriptName(), 
zookeeperUrl, topic))
+                .body.append(String.format("./bin/%s  --create --zookeeper 
\"%s\" --replication-factor 1 --partitions 1 --topic \"%s\"",
+                                           getTopicsScriptName(),
+                                           
escapeLiteralForDoubleQuotedBash(zookeeperUrl),
+                                           
escapeLiteralForDoubleQuotedBash(topic)))
                 .execute();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/1e8899f0/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
 
b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
index 9f490d9..c6487e5 100644
--- 
a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
+++ 
b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
@@ -125,11 +125,16 @@ public class KafkaIntegrationTest {
 
         Entities.dumpInfo(cluster);
 
-        KafkaSupport support = new KafkaSupport(cluster);
+        final KafkaSupport support = new KafkaSupport(cluster);
 
         support.sendMessage("brooklyn", "TEST_MESSAGE");
-        Thread.sleep(Duration.seconds(5).toMilliseconds());
-        String message = support.getMessage("brooklyn");
-        assertEquals(message, "TEST_MESSAGE");
+
+        Asserts.succeedsEventually(MutableMap.of("timeout", 
Duration.FIVE_SECONDS), new Runnable() {
+            @Override
+            public void run() {
+                String message = support.getMessage("brooklyn");
+                assertEquals(message, "TEST_MESSAGE");
+            }
+        });
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/1e8899f0/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
----------------------------------------------------------------------
diff --git 
a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
 
b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
index c80befa..e035784 100644
--- 
a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
+++ 
b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
@@ -22,7 +22,6 @@ import brooklyn.entity.Entity;
 import brooklyn.entity.basic.EntityPredicates;
 import brooklyn.entity.zookeeper.ZooKeeperNode;
 
-import brooklyn.util.time.Duration;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
@@ -66,16 +65,11 @@ public class KafkaSupport {
             props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
 
             Producer<String, String> producer = new KafkaProducer<>(props);
-            try {
-                ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);
-                Thread.sleep(Duration.seconds(1).toMilliseconds());
-
-                ProducerRecord<String, String> data = new 
ProducerRecord<>(topic, message);
-                producer.send(data);
-                producer.close();
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
+            ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);
+
+            ProducerRecord<String, String> data = new ProducerRecord<>(topic, 
message);
+            producer.send(data);
+            producer.close();
         } else {
             throw new InvalidParameterException("No kafka broker node found");
         }

Reply via email to