Copilot commented on code in PR #22: URL: https://github.com/apache/pulsar-connectors/pull/22#discussion_r3209492300
########## mqtt/build.gradle.kts: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("pulsar-connectors.java-conventions") + id("pulsar-connectors.nar-conventions") +} + +dependencies { + implementation(libs.pulsar.io.common) + implementation(libs.pulsar.io.core) + implementation(libs.pulsar.functions.instance) + implementation(libs.jackson.databind) + implementation(libs.jackson.dataformat.yaml) + implementation(libs.commons.lang3) + implementation(libs.hivemq.mqtt.client) Review Comment: `MqttSinkConfig` uses Guava `Preconditions`, but this module doesn’t declare a direct Guava dependency. Other connectors that use `Preconditions` explicitly add `implementation(libs.guava)`; add it here to avoid relying on transitive dependencies and potential compilation breakage if the dependency graph changes. ########## mqtt/src/main/resources/META-INF/services/pulsar-io.yaml: ########## @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +name: mqtt-sink Review Comment: Connector name `mqtt-sink` is inconsistent with other connectors’ `pulsar-io.yaml` naming (typically the system name like `redis`, `solr`, `rabbitmq`). Consider using `name: mqtt` to match established conventions and simplify user-facing configuration. ########## mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mqtt; + +import com.hivemq.client.mqtt.MqttClient; +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.core.annotations.Connector; +import org.apache.pulsar.io.core.annotations.IOType; + +@Connector( + name = "mqtt-sink", + type = IOType.SINK, + help = "A sink connector that moves messages from Pulsar to MQTT.", + configClass = MqttSinkConfig.class +) Review Comment: `@Connector(name = "mqtt-sink")` should match the connector naming convention used across this repo (generally just the system name, e.g., `redis`, `solr`) and should be consistent with `META-INF/services/pulsar-io.yaml`. Consider renaming to `mqtt` (and updating the service descriptor accordingly) to avoid an outlier connector name. ########## mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mqtt; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; +import lombok.Data; +import lombok.experimental.Accessors; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.core.annotations.FieldDoc; + +@Data +@Accessors(chain = true) +public class MqttSinkConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + @FieldDoc( + required = true, + defaultValue = "", + help = "The MQTT broker host.") + private String serverHost; + + @FieldDoc( + required = true, + defaultValue = "1883", + help = "The MQTT broker port.") + private int serverPort = 1883; + + @FieldDoc( + required = true, + defaultValue = "", + help = "The MQTT topic to publish messages to.") + private String topic; + + @FieldDoc( + defaultValue = "", + help = "MQTT client id used for the broker connection.") + private String clientId; + + @FieldDoc( + defaultValue = "", + sensitive = true, + help = "MQTT username.") + private String username; + + @FieldDoc( + defaultValue = "", + sensitive = true, + help = "MQTT password.") + private String password; + + @FieldDoc( + defaultValue = "0", + help = "MQTT QoS level for outgoing messages. Valid values: 0, 1, 2.") + private int qos = 0; + + @FieldDoc( + defaultValue = "60", + help = "MQTT keep alive interval in seconds.") + private int keepAliveIntervalSec = 60; + + @FieldDoc( + defaultValue = "10000", + help = "Timeout in milliseconds for MQTT connect/disconnect operations.") + private long connectionTimeoutMs = 10000L; + + @FieldDoc( + defaultValue = "true", + help = "Whether to start with a clean session.") + private boolean cleanStart = true; + + @FieldDoc( + defaultValue = "false", + help = "Enable SSL/TLS with the client default SSL configuration.") + private boolean sslEnabled = false; + + public static MqttSinkConfig load(String yamlFile) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return mapper.readValue(new File(yamlFile), MqttSinkConfig.class); + } + + public static MqttSinkConfig load(Map<String, Object> map, SinkContext sinkContext) throws IOException { + return IOConfigUtils.loadWithSecrets(map, MqttSinkConfig.class, sinkContext); + } + + public void validate() { + Preconditions.checkArgument(StringUtils.isNotBlank(serverHost), "serverHost cannot be blank"); + Preconditions.checkArgument(serverPort > 0, "serverPort must be a positive integer"); + Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank"); + Preconditions.checkArgument(qos >= 0 && qos <= 2, "qos must be one of 0, 1, 2"); + Preconditions.checkArgument(keepAliveIntervalSec >= 0, "keepAliveIntervalSec must be >= 0"); + Preconditions.checkArgument(connectionTimeoutMs > 0, "connectionTimeoutMs must be > 0"); Review Comment: If `password` is configured but `username` is blank, the sink currently connects without auth and silently ignores the password (because auth is only enabled when `username` is non-blank). Add validation to reject `password`-without-`username` (or otherwise ensure the intended auth behavior) to prevent surprising misconfigurations. ########## mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mqtt; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.pulsar.io.core.SinkContext; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +public class MqttSinkConfigTest { + + @Test + public void loadFromYamlFileTest() throws IOException { + File yamlFile = getFile("sinkConfig.yaml"); + MqttSinkConfig config = MqttSinkConfig.load(yamlFile.getAbsolutePath()); + assertNotNull(config); + assertEquals(config.getServerHost(), "localhost"); + assertEquals(config.getServerPort(), 1883); + assertEquals(config.getTopic(), "test/topic"); + assertEquals(config.getClientId(), "pulsar-mqtt-test"); + assertEquals(config.getUsername(), "mqtt-user"); + assertEquals(config.getPassword(), "mqtt-password"); + assertEquals(config.getQos(), 1); + assertEquals(config.getKeepAliveIntervalSec(), 45); + assertEquals(config.getConnectionTimeoutMs(), 15000L); + assertTrue(config.isCleanStart()); + assertFalse(config.isSslEnabled()); + } + + @Test + public void loadFromMapTest() throws IOException { + SinkContext sinkContext = Mockito.mock(SinkContext.class); + MqttSinkConfig config = MqttSinkConfig.load(baseConfigMap(), sinkContext); + + assertNotNull(config); + assertEquals(config.getServerHost(), "localhost"); + assertEquals(config.getServerPort(), 1883); + assertEquals(config.getTopic(), "test/topic"); + assertEquals(config.getClientId(), "pulsar-mqtt-test"); + assertEquals(config.getQos(), 1); + } + + @Test + public void loadFromMapCredentialsFromSecretTest() throws IOException { + Map<String, Object> map = baseConfigMap(); + map.remove("username"); + map.remove("password"); + + SinkContext sinkContext = Mockito.mock(SinkContext.class); + Mockito.when(sinkContext.getSecret("username")).thenReturn("mqtt-user"); + Mockito.when(sinkContext.getSecret("password")).thenReturn("mqtt-password"); + + MqttSinkConfig config = MqttSinkConfig.load(map, sinkContext); + assertEquals(config.getUsername(), "mqtt-user"); + assertEquals(config.getPassword(), "mqtt-password"); + } + + @Test + public void validValidateTest() throws IOException { + SinkContext sinkContext = Mockito.mock(SinkContext.class); + MqttSinkConfig config = MqttSinkConfig.load(baseConfigMap(), sinkContext); + config.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "qos must be one of 0, 1, 2") + public void invalidQosValidateTest() throws IOException { + Map<String, Object> map = baseConfigMap(); + map.put("qos", 3); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + MqttSinkConfig config = MqttSinkConfig.load(map, sinkContext); + config.validate(); + } + + private static Map<String, Object> baseConfigMap() { + Map<String, Object> map = new HashMap<>(); + map.put("serverHost", "localhost"); + map.put("serverPort", 1883); + map.put("topic", "test/topic"); + map.put("clientId", "pulsar-mqtt-test"); + map.put("username", "mqtt-user"); + map.put("password", "mqtt-password"); + map.put("qos", 1); + map.put("keepAliveIntervalSec", 45); + map.put("connectionTimeoutMs", 15000); + map.put("cleanStart", true); + map.put("sslEnabled", false); + return map; + } + + private File getFile(String name) { + ClassLoader classLoader = getClass().getClassLoader(); + return new File(classLoader.getResource(name).getFile()); + } Review Comment: `getFile()` uses `classLoader.getResource(name).getFile()`, which can break with URL-encoded paths (spaces, special chars) and can NPE if the resource is missing. Prefer resolving via `Objects.requireNonNull(getResource(name))` and `Paths.get(url.toURI()).toFile()` (or use `getResourceAsStream`). ########## mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.mqtt; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import com.hivemq.client.mqtt.MqttClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.SinkContext; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class MqttSinkTest { + + private static final int MQTT_PORT = 1883; + private static final String TEST_TOPIC = "pulsar/mqtt/e2e"; + private static final DockerImageName MOSQUITTO_IMAGE = DockerImageName.parse("eclipse-mosquitto:2"); + + private final GenericContainer<?> mqttContainer = new GenericContainer<>(MOSQUITTO_IMAGE) + .withExposedPorts(MQTT_PORT); + + @BeforeClass(alwaysRun = true) + public void beforeClass() { + mqttContainer.start(); + } + + @AfterClass(alwaysRun = true) + public void afterClass() { + mqttContainer.stop(); + } + + @Test + public void testWriteE2EWithMosquitto() throws Exception { + BlockingQueue<String> receivedPayloads = new LinkedBlockingQueue<>(); + CountDownLatch ackLatch = new CountDownLatch(3); + AtomicBoolean failCalled = new AtomicBoolean(false); + + Mqtt5AsyncClient subscriber = MqttClient.builder() + .useMqttVersion5() + .serverHost(mqttContainer.getHost()) + .serverPort(mqttContainer.getMappedPort(MQTT_PORT)) + .identifier("mqtt-sink-e2e-subscriber") + .buildAsync(); + + subscriber.connectWith() + .cleanStart(true) + .send() + .get(10, TimeUnit.SECONDS); + subscriber.subscribeWith() + .topicFilter(TEST_TOPIC) + .callback(publish -> receivedPayloads.add( + new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8))) + .send() + .get(10, TimeUnit.SECONDS); + + Map<String, Object> config = new HashMap<>(); + config.put("serverHost", mqttContainer.getHost()); + config.put("serverPort", mqttContainer.getMappedPort(MQTT_PORT)); + config.put("topic", TEST_TOPIC); + config.put("qos", 1); + config.put("connectionTimeoutMs", 10000); + config.put("clientId", "mqtt-sink-e2e-publisher"); + + SinkContext sinkContext = mock(SinkContext.class); + try (MqttSink sink = new MqttSink()) { + sink.open(config, sinkContext); + + for (int i = 0; i < 3; i++) { + sink.write(new TestRecord(("msg-" + i).getBytes(StandardCharsets.UTF_8), ackLatch, failCalled)); + } + + assertTrue(ackLatch.await(10, TimeUnit.SECONDS), "Timed out waiting for record.ack()"); + assertFalse(failCalled.get(), "record.fail() should not be called on successful publish"); + + assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-0"); + assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-1"); + assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-2"); + } finally { + subscriber.disconnectWith() + .sessionExpiryInterval(0) + .send() + .get(10, TimeUnit.SECONDS); Review Comment: The `finally` block always tries to disconnect the subscriber; if connect/subscribe fails earlier, this disconnect can throw and mask the original failure. Wrap the disconnect in its own try/catch (or check connection state) so test failures keep the primary cause. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
