Copilot commented on code in PR #22:
URL: https://github.com/apache/pulsar-connectors/pull/22#discussion_r3209492300


##########
mqtt/build.gradle.kts:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+    id("pulsar-connectors.java-conventions")
+    id("pulsar-connectors.nar-conventions")
+}
+
+dependencies {
+    implementation(libs.pulsar.io.common)
+    implementation(libs.pulsar.io.core)
+    implementation(libs.pulsar.functions.instance)
+    implementation(libs.jackson.databind)
+    implementation(libs.jackson.dataformat.yaml)
+    implementation(libs.commons.lang3)
+    implementation(libs.hivemq.mqtt.client)

Review Comment:
   `MqttSinkConfig` uses Guava `Preconditions`, but this module doesn’t declare 
a direct Guava dependency. Other connectors that use `Preconditions` explicitly 
add `implementation(libs.guava)`; add it here to avoid relying on transitive 
dependencies and potential compilation breakage if the dependency graph changes.
   



##########
mqtt/src/main/resources/META-INF/services/pulsar-io.yaml:
##########
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: mqtt-sink

Review Comment:
   Connector name `mqtt-sink` is inconsistent with other connectors’ 
`pulsar-io.yaml` naming (typically the system name like `redis`, `solr`, 
`rabbitmq`). Consider using `name: mqtt` to match established conventions and 
simplify user-facing configuration.
   



##########
mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mqtt;
+
+import com.hivemq.client.mqtt.MqttClient;
+import com.hivemq.client.mqtt.datatypes.MqttQos;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+@Connector(
+        name = "mqtt-sink",
+        type = IOType.SINK,
+        help = "A sink connector that moves messages from Pulsar to MQTT.",
+        configClass = MqttSinkConfig.class
+)

Review Comment:
   `@Connector(name = "mqtt-sink")` should match the connector naming 
convention used across this repo (generally just the system name, e.g., 
`redis`, `solr`) and should be consistent with 
`META-INF/services/pulsar-io.yaml`. Consider renaming to `mqtt` (and updating 
the service descriptor accordingly) to avoid an outlier connector name.



##########
mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mqtt;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import lombok.Data;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+@Data
+@Accessors(chain = true)
+public class MqttSinkConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "",
+            help = "The MQTT broker host.")
+    private String serverHost;
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "1883",
+            help = "The MQTT broker port.")
+    private int serverPort = 1883;
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "",
+            help = "The MQTT topic to publish messages to.")
+    private String topic;
+
+    @FieldDoc(
+            defaultValue = "",
+            help = "MQTT client id used for the broker connection.")
+    private String clientId;
+
+    @FieldDoc(
+            defaultValue = "",
+            sensitive = true,
+            help = "MQTT username.")
+    private String username;
+
+    @FieldDoc(
+            defaultValue = "",
+            sensitive = true,
+            help = "MQTT password.")
+    private String password;
+
+    @FieldDoc(
+            defaultValue = "0",
+            help = "MQTT QoS level for outgoing messages. Valid values: 0, 1, 
2.")
+    private int qos = 0;
+
+    @FieldDoc(
+            defaultValue = "60",
+            help = "MQTT keep alive interval in seconds.")
+    private int keepAliveIntervalSec = 60;
+
+    @FieldDoc(
+            defaultValue = "10000",
+            help = "Timeout in milliseconds for MQTT connect/disconnect 
operations.")
+    private long connectionTimeoutMs = 10000L;
+
+    @FieldDoc(
+            defaultValue = "true",
+            help = "Whether to start with a clean session.")
+    private boolean cleanStart = true;
+
+    @FieldDoc(
+            defaultValue = "false",
+            help = "Enable SSL/TLS with the client default SSL configuration.")
+    private boolean sslEnabled = false;
+
+    public static MqttSinkConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), MqttSinkConfig.class);
+    }
+
+    public static MqttSinkConfig load(Map<String, Object> map, SinkContext 
sinkContext) throws IOException {
+        return IOConfigUtils.loadWithSecrets(map, MqttSinkConfig.class, 
sinkContext);
+    }
+
+    public void validate() {
+        Preconditions.checkArgument(StringUtils.isNotBlank(serverHost), 
"serverHost cannot be blank");
+        Preconditions.checkArgument(serverPort > 0, "serverPort must be a 
positive integer");
+        Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic 
cannot be blank");
+        Preconditions.checkArgument(qos >= 0 && qos <= 2, "qos must be one of 
0, 1, 2");
+        Preconditions.checkArgument(keepAliveIntervalSec >= 0, 
"keepAliveIntervalSec must be >= 0");
+        Preconditions.checkArgument(connectionTimeoutMs > 0, 
"connectionTimeoutMs must be > 0");

Review Comment:
   If `password` is configured but `username` is blank, the sink currently 
connects without auth and silently ignores the password (because auth is only 
enabled when `username` is non-blank). Add validation to reject 
`password`-without-`username` (or otherwise ensure the intended auth behavior) 
to prevent surprising misconfigurations.
   



##########
mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mqtt;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+public class MqttSinkConfigTest {
+
+    @Test
+    public void loadFromYamlFileTest() throws IOException {
+        File yamlFile = getFile("sinkConfig.yaml");
+        MqttSinkConfig config = 
MqttSinkConfig.load(yamlFile.getAbsolutePath());
+        assertNotNull(config);
+        assertEquals(config.getServerHost(), "localhost");
+        assertEquals(config.getServerPort(), 1883);
+        assertEquals(config.getTopic(), "test/topic");
+        assertEquals(config.getClientId(), "pulsar-mqtt-test");
+        assertEquals(config.getUsername(), "mqtt-user");
+        assertEquals(config.getPassword(), "mqtt-password");
+        assertEquals(config.getQos(), 1);
+        assertEquals(config.getKeepAliveIntervalSec(), 45);
+        assertEquals(config.getConnectionTimeoutMs(), 15000L);
+        assertTrue(config.isCleanStart());
+        assertFalse(config.isSslEnabled());
+    }
+
+    @Test
+    public void loadFromMapTest() throws IOException {
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        MqttSinkConfig config = MqttSinkConfig.load(baseConfigMap(), 
sinkContext);
+
+        assertNotNull(config);
+        assertEquals(config.getServerHost(), "localhost");
+        assertEquals(config.getServerPort(), 1883);
+        assertEquals(config.getTopic(), "test/topic");
+        assertEquals(config.getClientId(), "pulsar-mqtt-test");
+        assertEquals(config.getQos(), 1);
+    }
+
+    @Test
+    public void loadFromMapCredentialsFromSecretTest() throws IOException {
+        Map<String, Object> map = baseConfigMap();
+        map.remove("username");
+        map.remove("password");
+
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        
Mockito.when(sinkContext.getSecret("username")).thenReturn("mqtt-user");
+        
Mockito.when(sinkContext.getSecret("password")).thenReturn("mqtt-password");
+
+        MqttSinkConfig config = MqttSinkConfig.load(map, sinkContext);
+        assertEquals(config.getUsername(), "mqtt-user");
+        assertEquals(config.getPassword(), "mqtt-password");
+    }
+
+    @Test
+    public void validValidateTest() throws IOException {
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        MqttSinkConfig config = MqttSinkConfig.load(baseConfigMap(), 
sinkContext);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "qos must be one of 0, 1, 2")
+    public void invalidQosValidateTest() throws IOException {
+        Map<String, Object> map = baseConfigMap();
+        map.put("qos", 3);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        MqttSinkConfig config = MqttSinkConfig.load(map, sinkContext);
+        config.validate();
+    }
+
+    private static Map<String, Object> baseConfigMap() {
+        Map<String, Object> map = new HashMap<>();
+        map.put("serverHost", "localhost");
+        map.put("serverPort", 1883);
+        map.put("topic", "test/topic");
+        map.put("clientId", "pulsar-mqtt-test");
+        map.put("username", "mqtt-user");
+        map.put("password", "mqtt-password");
+        map.put("qos", 1);
+        map.put("keepAliveIntervalSec", 45);
+        map.put("connectionTimeoutMs", 15000);
+        map.put("cleanStart", true);
+        map.put("sslEnabled", false);
+        return map;
+    }
+
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }

Review Comment:
   `getFile()` uses `classLoader.getResource(name).getFile()`, which can break 
with URL-encoded paths (spaces, special chars) and can NPE if the resource is 
missing. Prefer resolving via `Objects.requireNonNull(getResource(name))` and 
`Paths.get(url.toURI()).toFile()` (or use `getResourceAsStream`).



##########
mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mqtt;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import com.hivemq.client.mqtt.MqttClient;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MqttSinkTest {
+
+    private static final int MQTT_PORT = 1883;
+    private static final String TEST_TOPIC = "pulsar/mqtt/e2e";
+    private static final DockerImageName MOSQUITTO_IMAGE = 
DockerImageName.parse("eclipse-mosquitto:2");
+
+    private final GenericContainer<?> mqttContainer = new 
GenericContainer<>(MOSQUITTO_IMAGE)
+            .withExposedPorts(MQTT_PORT);
+
+    @BeforeClass(alwaysRun = true)
+    public void beforeClass() {
+        mqttContainer.start();
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void afterClass() {
+        mqttContainer.stop();
+    }
+
+    @Test
+    public void testWriteE2EWithMosquitto() throws Exception {
+        BlockingQueue<String> receivedPayloads = new LinkedBlockingQueue<>();
+        CountDownLatch ackLatch = new CountDownLatch(3);
+        AtomicBoolean failCalled = new AtomicBoolean(false);
+
+        Mqtt5AsyncClient subscriber = MqttClient.builder()
+                .useMqttVersion5()
+                .serverHost(mqttContainer.getHost())
+                .serverPort(mqttContainer.getMappedPort(MQTT_PORT))
+                .identifier("mqtt-sink-e2e-subscriber")
+                .buildAsync();
+
+        subscriber.connectWith()
+                .cleanStart(true)
+                .send()
+                .get(10, TimeUnit.SECONDS);
+        subscriber.subscribeWith()
+                .topicFilter(TEST_TOPIC)
+                .callback(publish -> receivedPayloads.add(
+                        new String(publish.getPayloadAsBytes(), 
StandardCharsets.UTF_8)))
+                .send()
+                .get(10, TimeUnit.SECONDS);
+
+        Map<String, Object> config = new HashMap<>();
+        config.put("serverHost", mqttContainer.getHost());
+        config.put("serverPort", mqttContainer.getMappedPort(MQTT_PORT));
+        config.put("topic", TEST_TOPIC);
+        config.put("qos", 1);
+        config.put("connectionTimeoutMs", 10000);
+        config.put("clientId", "mqtt-sink-e2e-publisher");
+
+        SinkContext sinkContext = mock(SinkContext.class);
+        try (MqttSink sink = new MqttSink()) {
+            sink.open(config, sinkContext);
+
+            for (int i = 0; i < 3; i++) {
+                sink.write(new TestRecord(("msg-" + 
i).getBytes(StandardCharsets.UTF_8), ackLatch, failCalled));
+            }
+
+            assertTrue(ackLatch.await(10, TimeUnit.SECONDS), "Timed out 
waiting for record.ack()");
+            assertFalse(failCalled.get(), "record.fail() should not be called 
on successful publish");
+
+            assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-0");
+            assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-1");
+            assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-2");
+        } finally {
+            subscriber.disconnectWith()
+                    .sessionExpiryInterval(0)
+                    .send()
+                    .get(10, TimeUnit.SECONDS);

Review Comment:
   The `finally` block always tries to disconnect the subscriber; if 
connect/subscribe fails earlier, this disconnect can throw and mask the 
original failure. Wrap the disconnect in its own try/catch (or check connection 
state) so test failures keep the primary cause.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to