[BEAM-1686] Use random MQTT clientID when not defined to avoid NPE
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/752ad8a0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/752ad8a0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/752ad8a0 Branch: refs/heads/gearpump-runner Commit: 752ad8a0ec5597ca84c27cb21862e32d05a81420 Parents: fdba784 Author: Borisa Zivkovic <borisa.zivko...@huawei.com> Authored: Fri Mar 10 12:20:13 2017 +0000 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Fri Mar 10 17:00:57 2017 +0100 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/mqtt/MqttIO.java | 4 ++ .../org/apache/beam/sdk/io/mqtt/MqttIOTest.java | 61 ++++++++++++++++++++ 2 files changed, 65 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/752ad8a0/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index 26234cf..46f2dcc 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -209,6 +209,10 @@ public class MqttIO { String clientId = getClientId() + "-" + UUID.randomUUID().toString(); LOG.debug("MQTT client id set to {}", clientId); client.setClientId(clientId); + } else { + String clientId = UUID.randomUUID().toString(); + LOG.debug("MQTT client id set to random value {}", clientId); + client.setClientId(clientId); } return client; } http://git-wip-us.apache.org/repos/asf/beam/blob/752ad8a0/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java index 8a82f40..28ca5f7 100644 --- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.Connection; +import org.apache.beam.sdk.io.mqtt.MqttIO.Read; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; @@ -79,6 +80,66 @@ public class MqttIOTest { @Test(timeout = 60 * 1000) @Category(RunnableOnService.class) + public void testReadNoClientId() throws Exception { + final String topicName = "READ_TOPIC_NO_CLIENT_ID"; + Read mqttReader = MqttIO.read() + .withConnectionConfiguration( + MqttIO.ConnectionConfiguration.create( + "tcp://localhost:" + port, + topicName)) + .withMaxNumRecords(10); + PCollection<byte[]> output = pipeline.apply(mqttReader); + PAssert.that(output).containsInAnyOrder( + "This is test 0".getBytes(), + "This is test 1".getBytes(), + "This is test 2".getBytes(), + "This is test 3".getBytes(), + "This is test 4".getBytes(), + "This is test 5".getBytes(), + "This is test 6".getBytes(), + "This is test 7".getBytes(), + "This is test 8".getBytes(), + "This is test 9".getBytes() + ); + + // produce messages on the brokerService in another thread + // This thread prevents to block the pipeline waiting for new messages + MQTT client = new MQTT(); + client.setHost("tcp://localhost:" + port); + final BlockingConnection publishConnection = client.blockingConnection(); + publishConnection.connect(); + Thread publisherThread = new Thread() { + public void run() { + try { + LOG.info("Waiting pipeline connected to the MQTT broker before sending " + + "messages ..."); + boolean pipelineConnected = false; + while (!pipelineConnected) { + Thread.sleep(1000); + for (Connection connection : brokerService.getBroker().getClients()) { + if (!connection.getConnectionId().isEmpty()) { + pipelineConnected = true; + } + } + } + for (int i = 0; i < 10; i++) { + publishConnection.publish(topicName, ("This is test " + i).getBytes(), + QoS.AT_LEAST_ONCE, false); + } + } catch (Exception e) { + // nothing to do + } + } + }; + publisherThread.start(); + pipeline.run(); + + publishConnection.disconnect(); + publisherThread.join(); + } + + @Test(timeout = 60 * 1000) + @Category(RunnableOnService.class) public void testRead() throws Exception { PCollection<byte[]> output = pipeline.apply( MqttIO.read()