gemini-code-assist[bot] commented on code in PR #39180:
URL: https://github.com/apache/beam/pull/39180#discussion_r3502189983


##########
sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java:
##########
@@ -299,29 +300,42 @@ private void populateDisplayData(DisplayData.Builder 
builder) {
       builder.addIfNotNull(DisplayData.item("username", getUsername()));
     }
 
-    private MQTT createClient() throws Exception {
+    private Mqtt3BlockingClient createClient() throws Exception {
       LOG.debug("Creating MQTT client to {}", getServerUri());
-      MQTT client = new MQTT();
-      client.setHost(getServerUri());
-      if (getUsername() != null) {
-        LOG.debug("MQTT client uses username {}", getUsername());
-        client.setUserName(getUsername());
-        client.setPassword(getPassword());
+      URI uri = new URI(getServerUri());
+      String host = uri.getHost();
+      int port = uri.getPort();
+      if (port == -1) {
+        port = "ssl".equals(uri.getScheme()) || "tls".equals(uri.getScheme()) 
? 8883 : 1883;
+      }
+
+      Mqtt3ClientBuilder builder = 
Mqtt3Client.builder().serverHost(host).serverPort(port);
+
+      if ("ssl".equals(uri.getScheme()) || "tls".equals(uri.getScheme())) {
+        builder = builder.sslWithDefaultConfig();
       }
-      if (getClientId() != null) {
-        String clientId = getClientId() + "-" + UUID.randomUUID().toString();
-        clientId =
-            clientId.substring(0, Math.min(clientId.length(), 
MQTT_3_1_MAX_CLIENT_ID_LENGTH));
-        LOG.debug("MQTT client id set to {}", clientId);
-        client.setClientId(clientId);
+
+      String clientId = getClientId();
+      if (clientId == null) {
+        clientId = UUID.randomUUID().toString();
       } else {
-        String clientId = UUID.randomUUID().toString();
-        clientId =
-            clientId.substring(0, Math.min(clientId.length(), 
MQTT_3_1_MAX_CLIENT_ID_LENGTH));
-        LOG.debug("MQTT client id set to random value {}", clientId);
-        client.setClientId(clientId);
+        clientId = clientId + "-" + UUID.randomUUID().toString();
       }
-      return client;
+      clientId = clientId.substring(0, Math.min(clientId.length(), 
MQTT_3_1_MAX_CLIENT_ID_LENGTH));
+      LOG.debug("MQTT client id set to {}", clientId);
+      builder = builder.identifier(clientId);
+
+      if (getUsername() != null) {
+        LOG.debug("MQTT client uses username {}", getUsername());
+        builder =
+            builder
+                .simpleAuth()
+                .username(getUsername())
+                .password(getPassword().getBytes(StandardCharsets.UTF_8))
+                .applySimpleAuth();
+      }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   If `getPassword()` is `null`, calling 
`getPassword().getBytes(StandardCharsets.UTF_8)` will throw a 
`NullPointerException`. We should check if the password is non-null before 
attempting to set it on the client builder.
   
   ```java
         if (getUsername() != null) {
           LOG.debug("MQTT client uses username {}", getUsername());
           if (getPassword() != null) {
             builder =
                 builder
                     .simpleAuth()
                     .username(getUsername())
                     .password(getPassword().getBytes(StandardCharsets.UTF_8))
                     .applySimpleAuth();
           } else {
             builder = 
builder.simpleAuth().username(getUsername()).applySimpleAuth();
           }
         }
   ```



##########
sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java:
##########
@@ -633,10 +659,13 @@ public boolean advance() throws IOException {
 
     @Override
     public void close() throws IOException {
-      LOG.debug("Closing MQTT reader (client ID {})", client.getClientId());
+      LOG.debug("Closing MQTT reader (client ID {})", clientId);
       try {
-        if (connection != null) {
-          connection.disconnect();
+        if (publishes != null) {
+          publishes.close();
+        }
+        if (client != null) {
+          client.disconnect();
         }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   If `publishes.close()` throws an exception, `client.disconnect()` will not 
be called, which can lead to connection leaks. We should ensure both resources 
are closed/disconnected even if one of them fails.
   
   ```java
           if (publishes != null) {
             try {
               publishes.close();
             } catch (Exception e) {
               LOG.warn("Error closing publishes stream", e);
             }
           }
           if (client != null) {
             client.disconnect();
           }
   ```



##########
sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java:
##########
@@ -603,11 +619,20 @@ public boolean start() throws IOException {
       Read<T> spec = source.spec;
       try {
         client = spec.connectionConfiguration().createClient();
-        LOG.debug("Reader client ID is {}", client.getClientId());
-        checkpointPreparer.clientId = client.getClientId().toString();
-        connection = createConnection(client);
-        connection.subscribe(
-            new Topic[] {new Topic(spec.connectionConfiguration().getTopic(), 
QoS.AT_LEAST_ONCE)});
+        this.clientId = 
client.getConfig().getClientIdentifier().map(Object::toString).orElse("");
+        LOG.debug("Reader client ID is {}", clientId);
+        checkpointPreparer.clientId = clientId;
+        client.connect();
+
+        // Subscribe and get the publishes stream with manual acks enabled
+        publishes = client.publishes(MqttGlobalPublishFilter.ALL, true);
+
+        client
+            .subscribeWith()
+            .topicFilter(spec.connectionConfiguration().getTopic())
+            .qos(MqttQos.AT_LEAST_ONCE)
+            .send();

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   If an exception occurs after `client.connect()` succeeds (e.g., during 
subscription or getting the publishes stream), the client connection is leaked 
because it is never disconnected. We should catch exceptions after connection 
and disconnect the client before rethrowing.
   
   ```java
           client.connect();
           try {
             // Subscribe and get the publishes stream with manual acks enabled
             publishes = client.publishes(MqttGlobalPublishFilter.ALL, true);
   
             client
                 .subscribeWith()
                 .topicFilter(spec.connectionConfiguration().getTopic())
                 .qos(MqttQos.AT_LEAST_ONCE)
                 .send();
           } catch (Exception e) {
             try {
               client.disconnect();
             } catch (Exception disconnectEx) {
               e.addSuppressed(disconnectEx);
             }
             throw e;
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to