tkaymak commented on code in PR #39180:
URL: https://github.com/apache/beam/pull/39180#discussion_r3506438174


##########
sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java:
##########
@@ -299,29 +300,42 @@ private void populateDisplayData(DisplayData.Builder 
builder) {
       builder.addIfNotNull(DisplayData.item("username", getUsername()));
     }
 
-    private MQTT createClient() throws Exception {
+    private Mqtt3BlockingClient createClient() throws Exception {
       LOG.debug("Creating MQTT client to {}", getServerUri());
-      MQTT client = new MQTT();
-      client.setHost(getServerUri());
-      if (getUsername() != null) {
-        LOG.debug("MQTT client uses username {}", getUsername());
-        client.setUserName(getUsername());
-        client.setPassword(getPassword());
+      URI uri = new URI(getServerUri());
+      String host = uri.getHost();
+      int port = uri.getPort();
+      if (port == -1) {
+        port = "ssl".equals(uri.getScheme()) || "tls".equals(uri.getScheme()) 
? 8883 : 1883;
+      }
+
+      Mqtt3ClientBuilder builder = 
Mqtt3Client.builder().serverHost(host).serverPort(port);
+
+      if ("ssl".equals(uri.getScheme()) || "tls".equals(uri.getScheme())) {
+        builder = builder.sslWithDefaultConfig();
       }
-      if (getClientId() != null) {
-        String clientId = getClientId() + "-" + UUID.randomUUID().toString();
-        clientId =
-            clientId.substring(0, Math.min(clientId.length(), 
MQTT_3_1_MAX_CLIENT_ID_LENGTH));
-        LOG.debug("MQTT client id set to {}", clientId);
-        client.setClientId(clientId);
+
+      String clientId = getClientId();
+      if (clientId == null) {
+        clientId = UUID.randomUUID().toString();
       } else {
-        String clientId = UUID.randomUUID().toString();
-        clientId =
-            clientId.substring(0, Math.min(clientId.length(), 
MQTT_3_1_MAX_CLIENT_ID_LENGTH));
-        LOG.debug("MQTT client id set to random value {}", clientId);
-        client.setClientId(clientId);
+        clientId = clientId + "-" + UUID.randomUUID().toString();
       }
-      return client;
+      clientId = clientId.substring(0, Math.min(clientId.length(), 
MQTT_3_1_MAX_CLIENT_ID_LENGTH));
+      LOG.debug("MQTT client id set to {}", clientId);
+      builder = builder.identifier(clientId);
+
+      if (getUsername() != null) {
+        LOG.debug("MQTT client uses username {}", getUsername());
+        builder =
+            builder
+                .simpleAuth()
+                .username(getUsername())
+                .password(getPassword().getBytes(StandardCharsets.UTF_8))
+                .applySimpleAuth();
+      }

Review Comment:
   seems reasonable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to