[BEAM-1686] Use random MQTT clientID when not defined to avoid NPE

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/752ad8a0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/752ad8a0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/752ad8a0

Branch: refs/heads/gearpump-runner
Commit: 752ad8a0ec5597ca84c27cb21862e32d05a81420
Parents: fdba784
Author: Borisa Zivkovic <borisa.zivko...@huawei.com>
Authored: Fri Mar 10 12:20:13 2017 +0000
Committer: Jean-Baptiste Onofré <jbono...@apache.org>
Committed: Fri Mar 10 17:00:57 2017 +0100

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/mqtt/MqttIO.java     |  4 ++
 .../org/apache/beam/sdk/io/mqtt/MqttIOTest.java | 61 ++++++++++++++++++++
 2 files changed, 65 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/752ad8a0/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java 
b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index 26234cf..46f2dcc 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -209,6 +209,10 @@ public class MqttIO {
         String clientId = getClientId() + "-" + UUID.randomUUID().toString();
         LOG.debug("MQTT client id set to {}", clientId);
         client.setClientId(clientId);
+      } else {
+        String clientId = UUID.randomUUID().toString();
+        LOG.debug("MQTT client id set to random value {}", clientId);
+        client.setClientId(clientId);
       }
       return client;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/752ad8a0/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java 
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
index 8a82f40..28ca5f7 100644
--- 
a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
+++ 
b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java
@@ -27,6 +27,7 @@ import java.util.Set;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.Connection;
+import org.apache.beam.sdk.io.mqtt.MqttIO.Read;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -79,6 +80,66 @@ public class MqttIOTest {
 
   @Test(timeout = 60 * 1000)
   @Category(RunnableOnService.class)
+  public void testReadNoClientId() throws Exception {
+    final String topicName = "READ_TOPIC_NO_CLIENT_ID";
+    Read mqttReader = MqttIO.read()
+    .withConnectionConfiguration(
+        MqttIO.ConnectionConfiguration.create(
+            "tcp://localhost:" + port,
+            topicName))
+  .withMaxNumRecords(10);
+    PCollection<byte[]> output = pipeline.apply(mqttReader);
+    PAssert.that(output).containsInAnyOrder(
+        "This is test 0".getBytes(),
+        "This is test 1".getBytes(),
+        "This is test 2".getBytes(),
+        "This is test 3".getBytes(),
+        "This is test 4".getBytes(),
+        "This is test 5".getBytes(),
+        "This is test 6".getBytes(),
+        "This is test 7".getBytes(),
+        "This is test 8".getBytes(),
+        "This is test 9".getBytes()
+    );
+
+    // produce messages on the brokerService in another thread
+    // This thread prevents to block the pipeline waiting for new messages
+    MQTT client = new MQTT();
+    client.setHost("tcp://localhost:" + port);
+    final BlockingConnection publishConnection = client.blockingConnection();
+    publishConnection.connect();
+    Thread publisherThread = new Thread() {
+      public void run() {
+        try {
+          LOG.info("Waiting pipeline connected to the MQTT broker before 
sending "
+              + "messages ...");
+          boolean pipelineConnected = false;
+          while (!pipelineConnected) {
+            Thread.sleep(1000);
+            for (Connection connection : 
brokerService.getBroker().getClients()) {
+              if (!connection.getConnectionId().isEmpty()) {
+                pipelineConnected = true;
+              }
+            }
+          }
+          for (int i = 0; i < 10; i++) {
+            publishConnection.publish(topicName, ("This is test " + 
i).getBytes(),
+                QoS.AT_LEAST_ONCE, false);
+          }
+        } catch (Exception e) {
+          // nothing to do
+        }
+      }
+    };
+    publisherThread.start();
+    pipeline.run();
+
+    publishConnection.disconnect();
+    publisherThread.join();
+  }
+
+  @Test(timeout = 60 * 1000)
+  @Category(RunnableOnService.class)
   public void testRead() throws Exception {
     PCollection<byte[]> output = pipeline.apply(
         MqttIO.read()

Reply via email to