This is an automated email from the ASF dual-hosted git repository.

david-streamlio pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new 05379359 [feat][io] Add MQTT sink connector (#22)
05379359 is described below

commit 05379359e2d339a30099039ceeb2f84f50863387
Author: Dream95 <[email protected]>
AuthorDate: Sun May 10 23:29:20 2026 +0800

    [feat][io] Add MQTT sink connector (#22)
    
    * [feat][io] Add MQTT sink connector
    
    Signed-off-by: Dream95 <[email protected]>
    
    * [fix][io] Stabilize MQTT sink tests and split integration coverage
    
    Signed-off-by: Dream95 <[email protected]>
    
    * [fix][io] Handle disconnect interruption and correct serverPort FieldDoc
    
    Signed-off-by: Dream95 <[email protected]>
    
    ---------
    
    Signed-off-by: Dream95 <[email protected]>
---
 README.md                                          |   1 +
 distribution/io/build.gradle.kts                   |   1 +
 gradle/libs.versions.toml                          |   2 +
 mqtt/build.gradle.kts                              |  53 ++++++++
 .../pulsar/io/mqtt/MqttSinkIntegrationTest.java    | 150 +++++++++++++++++++++
 .../java/org/apache/pulsar/io/mqtt/MqttSink.java   | 136 +++++++++++++++++++
 .../org/apache/pulsar/io/mqtt/MqttSinkConfig.java  | 119 ++++++++++++++++
 .../resources/META-INF/services/pulsar-io.yaml     |  22 +++
 .../apache/pulsar/io/mqtt/MqttSinkConfigTest.java  | 133 ++++++++++++++++++
 .../org/apache/pulsar/io/mqtt/MqttSinkTest.java    | 129 ++++++++++++++++++
 mqtt/src/test/resources/sinkConfig.yaml            |  29 ++++
 settings.gradle.kts                                |   1 +
 12 files changed, 776 insertions(+)

diff --git a/README.md b/README.md
index ea0694f7..174bf57e 100644
--- a/README.md
+++ b/README.md
@@ -58,6 +58,7 @@ mounting them into the `apachepulsar/pulsar` Docker image.
 | Kafka | Apache Kafka |
 | Kinesis | Amazon Kinesis Data Streams |
 | MongoDB | MongoDB |
+| MQTT | MQTT broker |
 | Redis | Redis |
 | Solr | Apache Solr |
 
diff --git a/distribution/io/build.gradle.kts b/distribution/io/build.gradle.kts
index 9aa7b814..66f3a6c7 100644
--- a/distribution/io/build.gradle.kts
+++ b/distribution/io/build.gradle.kts
@@ -65,6 +65,7 @@ dependencies {
     connectorNars(project(":influxdb"))
     connectorNars(project(":redis"))
     connectorNars(project(":solr"))
+    connectorNars(project(":mqtt"))
     connectorNars(project(":dynamodb"))
     connectorNars(project(":alluxio"))
     connectorNars(project(":azure-data-explorer"))
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index e397b65b..ca155662 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -176,6 +176,7 @@ aerospike-client = "4.5.0"
 aws-sdk = "1.12.788"
 aws-sdk2 = "2.32.28"
 rabbitmq-client = "5.28.0"
+hivemq-mqtt-client = "1.3.13"
 cassandra-driver = "3.11.2"
 mongodb-driver = "5.4.0"
 influxdb-client = "7.3.0"
@@ -494,6 +495,7 @@ solr-test-framework = { module = 
"org.apache.solr:solr-test-framework", version.
 solr-core = { module = "org.apache.solr:solr-core", version.ref = "solr" }
 # Messaging
 rabbitmq-amqp-client = { module = "com.rabbitmq:amqp-client", version.ref = 
"rabbitmq-client" }
+hivemq-mqtt-client = { module = "com.hivemq:hivemq-mqtt-client", version.ref = 
"hivemq-mqtt-client" }
 nsq-j = { module = "com.sproutsocial:nsq-j", version.ref = "nsq-client" }
 # Time series
 influxdb-client-java = { module = "com.influxdb:influxdb-client-java", 
version.ref = "influxdb-client" }
diff --git a/mqtt/build.gradle.kts b/mqtt/build.gradle.kts
new file mode 100644
index 00000000..671a8380
--- /dev/null
+++ b/mqtt/build.gradle.kts
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+    id("pulsar-connectors.java-conventions")
+    id("pulsar-connectors.nar-conventions")
+}
+
+val integrationTest by sourceSets.creating {
+    compileClasspath += sourceSets.main.get().output + 
configurations.testCompileClasspath.get()
+    runtimeClasspath += output + sourceSets.main.get().output + 
configurations.testRuntimeClasspath.get()
+    resources.srcDir(rootProject.file("gradle/test-resources"))
+}
+
+configurations[integrationTest.implementationConfigurationName].extendsFrom(configurations.testImplementation.get())
+configurations[integrationTest.runtimeOnlyConfigurationName].extendsFrom(configurations.testRuntimeOnly.get())
+
+dependencies {
+    implementation(libs.pulsar.io.common)
+    implementation(libs.pulsar.io.core)
+    implementation(libs.pulsar.functions.instance)
+    implementation(libs.jackson.databind)
+    implementation(libs.jackson.dataformat.yaml)
+    implementation(libs.commons.lang3)
+    implementation(libs.guava)
+    implementation(libs.hivemq.mqtt.client)
+
+    add(integrationTest.implementationConfigurationName, libs.testcontainers)
+}
+
+tasks.register<Test>("integrationTest") {
+    description = "Runs MQTT integration tests that require Docker."
+    group = LifecycleBasePlugin.VERIFICATION_GROUP
+    testClassesDirs = integrationTest.output.classesDirs
+    classpath = integrationTest.runtimeClasspath
+    shouldRunAfter("test")
+}
diff --git 
a/mqtt/src/integrationTest/java/org/apache/pulsar/io/mqtt/MqttSinkIntegrationTest.java
 
b/mqtt/src/integrationTest/java/org/apache/pulsar/io/mqtt/MqttSinkIntegrationTest.java
new file mode 100644
index 00000000..85b34c4e
--- /dev/null
+++ 
b/mqtt/src/integrationTest/java/org/apache/pulsar/io/mqtt/MqttSinkIntegrationTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mqtt;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import com.hivemq.client.mqtt.MqttClient;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MqttSinkIntegrationTest {
+
+    private static final Logger log = 
LoggerFactory.getLogger(MqttSinkIntegrationTest.class);
+
+    private static final int MQTT_PORT = 1883;
+    private static final String TEST_TOPIC = "pulsar/mqtt/e2e";
+    private static final DockerImageName MOSQUITTO_IMAGE = 
DockerImageName.parse("eclipse-mosquitto:2");
+
+    private final GenericContainer<?> mqttContainer = new 
GenericContainer<>(MOSQUITTO_IMAGE)
+            .withExposedPorts(MQTT_PORT);
+
+    @BeforeClass(alwaysRun = true)
+    public void beforeClass() {
+        mqttContainer.start();
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void afterClass() {
+        mqttContainer.stop();
+    }
+
+    @Test
+    public void testWriteE2EWithMosquitto() throws Exception {
+        BlockingQueue<String> receivedPayloads = new LinkedBlockingQueue<>();
+        CountDownLatch ackLatch = new CountDownLatch(3);
+        AtomicBoolean failCalled = new AtomicBoolean(false);
+
+        Mqtt5AsyncClient subscriber = MqttClient.builder()
+                .useMqttVersion5()
+                .serverHost(mqttContainer.getHost())
+                .serverPort(mqttContainer.getMappedPort(MQTT_PORT))
+                .identifier("mqtt-sink-e2e-subscriber")
+                .buildAsync();
+
+        subscriber.connectWith()
+                .cleanStart(true)
+                .send()
+                .get(10, TimeUnit.SECONDS);
+        subscriber.subscribeWith()
+                .topicFilter(TEST_TOPIC)
+                .callback(publish -> receivedPayloads.add(
+                        new String(publish.getPayloadAsBytes(), 
StandardCharsets.UTF_8)))
+                .send()
+                .get(10, TimeUnit.SECONDS);
+
+        Map<String, Object> config = new HashMap<>();
+        config.put("serverHost", mqttContainer.getHost());
+        config.put("serverPort", mqttContainer.getMappedPort(MQTT_PORT));
+        config.put("topic", TEST_TOPIC);
+        config.put("qos", 1);
+        config.put("connectionTimeoutMs", 10000);
+        config.put("clientId", "mqtt-sink-e2e-publisher");
+
+        SinkContext sinkContext = mock(SinkContext.class);
+        try (MqttSink sink = new MqttSink()) {
+            sink.open(config, sinkContext);
+
+            for (int i = 0; i < 3; i++) {
+                sink.write(new TestRecord(("msg-" + 
i).getBytes(StandardCharsets.UTF_8), ackLatch, failCalled));
+            }
+
+            assertTrue(ackLatch.await(10, TimeUnit.SECONDS), "Timed out 
waiting for record.ack()");
+            assertFalse(failCalled.get(), "record.fail() should not be called 
on successful publish");
+
+            assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-0");
+            assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-1");
+            assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-2");
+        } finally {
+            try {
+                subscriber.disconnectWith()
+                        .sessionExpiryInterval(0)
+                        .send()
+                        .get(10, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                log.warn("Failed to disconnect MQTT subscriber in test 
cleanup", e);
+            }
+        }
+    }
+
+    private static final class TestRecord implements Record<byte[]> {
+        private final byte[] value;
+        private final CountDownLatch ackLatch;
+        private final AtomicBoolean failCalled;
+
+        private TestRecord(byte[] value, CountDownLatch ackLatch, 
AtomicBoolean failCalled) {
+            this.value = value;
+            this.ackLatch = ackLatch;
+            this.failCalled = failCalled;
+        }
+
+        @Override
+        public byte[] getValue() {
+            return value;
+        }
+
+        @Override
+        public void ack() {
+            ackLatch.countDown();
+        }
+
+        @Override
+        public void fail() {
+            failCalled.set(true);
+        }
+    }
+}
diff --git a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java 
b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java
new file mode 100644
index 00000000..cf7eee40
--- /dev/null
+++ b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mqtt;
+
+import com.hivemq.client.mqtt.MqttClient;
+import com.hivemq.client.mqtt.datatypes.MqttQos;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+@Connector(
+        name = "mqtt",
+        type = IOType.SINK,
+        help = "A sink connector that moves messages from Pulsar to MQTT.",
+        configClass = MqttSinkConfig.class
+)
+@Slf4j
+public class MqttSink implements Sink<byte[]> {
+
+    private MqttSinkConfig mqttSinkConfig;
+    private Mqtt5AsyncClient mqttClient;
+    private MqttQos mqttQos;
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
+        mqttSinkConfig = MqttSinkConfig.load(config, sinkContext);
+        mqttSinkConfig.validate();
+        mqttQos = MqttQos.fromCode(mqttSinkConfig.getQos());
+
+        var builder = MqttClient.builder()
+                .useMqttVersion5()
+                .serverHost(mqttSinkConfig.getServerHost())
+                .serverPort(mqttSinkConfig.getServerPort());
+
+        if (StringUtils.isNotBlank(mqttSinkConfig.getClientId())) {
+            builder = builder.identifier(mqttSinkConfig.getClientId());
+        }
+        if (mqttSinkConfig.isSslEnabled()) {
+            builder = builder.sslWithDefaultConfig();
+        }
+
+        mqttClient = buildClient(builder);
+        if (StringUtils.isNotBlank(mqttSinkConfig.getUsername())) {
+            var authBuilder = mqttClient.connectWith()
+                    .cleanStart(mqttSinkConfig.isCleanStart())
+                    .keepAlive(mqttSinkConfig.getKeepAliveIntervalSec())
+                    .simpleAuth()
+                    .username(mqttSinkConfig.getUsername());
+            if (mqttSinkConfig.getPassword() != null) {
+                authBuilder = 
authBuilder.password(mqttSinkConfig.getPassword().getBytes(StandardCharsets.UTF_8));
+            }
+            authBuilder.applySimpleAuth()
+                    .send()
+                    .get(mqttSinkConfig.getConnectionTimeoutMs(), 
TimeUnit.MILLISECONDS);
+        } else {
+            mqttClient.connectWith()
+                    .cleanStart(mqttSinkConfig.isCleanStart())
+                    .keepAlive(mqttSinkConfig.getKeepAliveIntervalSec())
+                    .send()
+                    .get(mqttSinkConfig.getConnectionTimeoutMs(), 
TimeUnit.MILLISECONDS);
+        }
+        log.info("MQTT sink connected to {}:{}.",
+                mqttSinkConfig.getServerHost(), 
mqttSinkConfig.getServerPort());
+    }
+
+    Mqtt5AsyncClient buildClient(Mqtt5ClientBuilder builder) {
+        return builder.buildAsync();
+    }
+
+    @Override
+    public void write(Record<byte[]> record) {
+        try {
+            byte[] payload = record.getValue() == null ? new byte[0] : 
record.getValue();
+            mqttClient.publishWith()
+                    .topic(mqttSinkConfig.getTopic())
+                    .qos(mqttQos)
+                    .payload(payload)
+                    .send()
+                    .whenComplete((result, throwable) -> {
+                        if (throwable == null) {
+                            record.ack();
+                        } else {
+                            record.fail();
+                            log.warn("Failed to publish message to MQTT topic 
{}",
+                                    mqttSinkConfig.getTopic(), throwable);
+                        }
+                    });
+        } catch (Exception e) {
+            record.fail();
+            log.warn("Failed to schedule MQTT publish for topic {}", 
mqttSinkConfig.getTopic(), e);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (mqttClient == null) {
+            return;
+        }
+
+        try {
+            mqttClient.disconnectWith()
+                    .send()
+                    .get(mqttSinkConfig.getConnectionTimeoutMs(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn("MQTT disconnect was interrupted", e);
+        } catch (Exception e) {
+            log.warn("Failed to disconnect MQTT client cleanly", e);
+        }
+    }
+}
diff --git a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java 
b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java
new file mode 100644
index 00000000..646462d0
--- /dev/null
+++ b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mqtt;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import lombok.Data;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+@Data
+@Accessors(chain = true)
+public class MqttSinkConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "",
+            help = "The MQTT broker host.")
+    private String serverHost;
+
+    @FieldDoc(
+            defaultValue = "1883",
+            help = "The MQTT broker port.")
+    private int serverPort = 1883;
+
+    @FieldDoc(
+            required = true,
+            defaultValue = "",
+            help = "The MQTT topic to publish messages to.")
+    private String topic;
+
+    @FieldDoc(
+            defaultValue = "",
+            help = "MQTT client id used for the broker connection.")
+    private String clientId;
+
+    @FieldDoc(
+            defaultValue = "",
+            sensitive = true,
+            help = "MQTT username.")
+    private String username;
+
+    @FieldDoc(
+            defaultValue = "",
+            sensitive = true,
+            help = "MQTT password.")
+    private String password;
+
+    @FieldDoc(
+            defaultValue = "0",
+            help = "MQTT QoS level for outgoing messages. Valid values: 0, 1, 
2.")
+    private int qos = 0;
+
+    @FieldDoc(
+            defaultValue = "60",
+            help = "MQTT keep alive interval in seconds.")
+    private int keepAliveIntervalSec = 60;
+
+    @FieldDoc(
+            defaultValue = "10000",
+            help = "Timeout in milliseconds for MQTT connect/disconnect 
operations.")
+    private long connectionTimeoutMs = 10000L;
+
+    @FieldDoc(
+            defaultValue = "true",
+            help = "Whether to start with a clean session.")
+    private boolean cleanStart = true;
+
+    @FieldDoc(
+            defaultValue = "false",
+            help = "Enable SSL/TLS with the client default SSL configuration.")
+    private boolean sslEnabled = false;
+
+    public static MqttSinkConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), MqttSinkConfig.class);
+    }
+
+    public static MqttSinkConfig load(Map<String, Object> map, SinkContext 
sinkContext) throws IOException {
+        return IOConfigUtils.loadWithSecrets(map, MqttSinkConfig.class, 
sinkContext);
+    }
+
+    public void validate() {
+        Preconditions.checkArgument(StringUtils.isNotBlank(serverHost), 
"serverHost cannot be blank");
+        Preconditions.checkArgument(serverPort > 0, "serverPort must be a 
positive integer");
+        Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic 
cannot be blank");
+        Preconditions.checkArgument(StringUtils.isNotBlank(username) || 
StringUtils.isBlank(password),
+                "password cannot be set when username is blank");
+        Preconditions.checkArgument(qos >= 0 && qos <= 2, "qos must be one of 
0, 1, 2");
+        Preconditions.checkArgument(keepAliveIntervalSec >= 0, 
"keepAliveIntervalSec must be >= 0");
+        Preconditions.checkArgument(connectionTimeoutMs > 0, 
"connectionTimeoutMs must be > 0");
+    }
+}
diff --git a/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml 
b/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 00000000..b93d81af
--- /dev/null
+++ b/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: mqtt
+description: MQTT sink connector
+sinkClass: org.apache.pulsar.io.mqtt.MqttSink
+sinkConfigClass: org.apache.pulsar.io.mqtt.MqttSinkConfig
diff --git 
a/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java 
b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java
new file mode 100644
index 00000000..0707f0a7
--- /dev/null
+++ b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mqtt;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+public class MqttSinkConfigTest {
+
+    @Test
+    public void loadFromYamlFileTest() throws Exception {
+        File yamlFile = getFile("sinkConfig.yaml");
+        MqttSinkConfig config = 
MqttSinkConfig.load(yamlFile.getAbsolutePath());
+        assertNotNull(config);
+        assertEquals(config.getServerHost(), "localhost");
+        assertEquals(config.getServerPort(), 1883);
+        assertEquals(config.getTopic(), "test/topic");
+        assertEquals(config.getClientId(), "pulsar-mqtt-test");
+        assertEquals(config.getUsername(), "mqtt-user");
+        assertEquals(config.getPassword(), "mqtt-password");
+        assertEquals(config.getQos(), 1);
+        assertEquals(config.getKeepAliveIntervalSec(), 45);
+        assertEquals(config.getConnectionTimeoutMs(), 15000L);
+        assertTrue(config.isCleanStart());
+        assertFalse(config.isSslEnabled());
+    }
+
+    @Test
+    public void loadFromMapTest() throws IOException {
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        MqttSinkConfig config = MqttSinkConfig.load(baseConfigMap(), 
sinkContext);
+
+        assertNotNull(config);
+        assertEquals(config.getServerHost(), "localhost");
+        assertEquals(config.getServerPort(), 1883);
+        assertEquals(config.getTopic(), "test/topic");
+        assertEquals(config.getClientId(), "pulsar-mqtt-test");
+        assertEquals(config.getQos(), 1);
+    }
+
+    @Test
+    public void loadFromMapCredentialsFromSecretTest() throws IOException {
+        Map<String, Object> map = baseConfigMap();
+        map.remove("username");
+        map.remove("password");
+
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        
Mockito.when(sinkContext.getSecret("username")).thenReturn("mqtt-user");
+        
Mockito.when(sinkContext.getSecret("password")).thenReturn("mqtt-password");
+
+        MqttSinkConfig config = MqttSinkConfig.load(map, sinkContext);
+        assertEquals(config.getUsername(), "mqtt-user");
+        assertEquals(config.getPassword(), "mqtt-password");
+    }
+
+    @Test
+    public void validValidateTest() throws IOException {
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        MqttSinkConfig config = MqttSinkConfig.load(baseConfigMap(), 
sinkContext);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "qos must be one of 0, 1, 2")
+    public void invalidQosValidateTest() throws IOException {
+        Map<String, Object> map = baseConfigMap();
+        map.put("qos", 3);
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        MqttSinkConfig config = MqttSinkConfig.load(map, sinkContext);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "password cannot be set when 
username is blank")
+    public void passwordWithoutUsernameValidateTest() throws IOException {
+        Map<String, Object> map = baseConfigMap();
+        map.put("username", "");
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        MqttSinkConfig config = MqttSinkConfig.load(map, sinkContext);
+        config.validate();
+    }
+
+    private static Map<String, Object> baseConfigMap() {
+        Map<String, Object> map = new HashMap<>();
+        map.put("serverHost", "localhost");
+        map.put("serverPort", 1883);
+        map.put("topic", "test/topic");
+        map.put("clientId", "pulsar-mqtt-test");
+        map.put("username", "mqtt-user");
+        map.put("password", "mqtt-password");
+        map.put("qos", 1);
+        map.put("keepAliveIntervalSec", 45);
+        map.put("connectionTimeoutMs", 15000);
+        map.put("cleanStart", true);
+        map.put("sslEnabled", false);
+        return map;
+    }
+
+    private File getFile(String name) throws URISyntaxException {
+        URL resource = 
Objects.requireNonNull(getClass().getClassLoader().getResource(name),
+                "Missing test resource: " + name);
+        return Paths.get(resource.toURI()).toFile();
+    }
+}
diff --git a/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java 
b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java
new file mode 100644
index 00000000..326d5a97
--- /dev/null
+++ b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mqtt;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
+import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+public class MqttSinkTest {
+
+    @Test
+    public void writeShouldCallFailWhenPublishThrowsSynchronously() {
+        Mqtt5AsyncClient mqttClient = mock(Mqtt5AsyncClient.class);
+        when(mqttClient.publishWith()).thenThrow(new RuntimeException("publish 
failed"));
+        MqttSink sink = newSinkWithOpenedClient(mqttClient);
+        TestRecord record = new TestRecord("x".getBytes(), new 
CountDownLatch(1), new AtomicBoolean(false));
+
+        sink.write(record);
+
+        assertTrue(record.isFailed(), "record.fail() should be called");
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+            expectedExceptionsMessageRegExp = "password cannot be set when 
username is blank")
+    public void openShouldPropagateConfigValidationFailure() throws Exception {
+        Map<String, Object> invalidConfig = baseConfigMap();
+        invalidConfig.put("username", "");
+        invalidConfig.put("password", "pwd");
+        try (MqttSink sink = new MqttSink()) {
+            sink.open(invalidConfig, mock(SinkContext.class));
+        }
+    }
+
+    @Test
+    public void closeShouldBeSafeWhenSinkWasNeverOpened() {
+        new MqttSink().close();
+    }
+
+    private MqttSink newSinkWithOpenedClient(Mqtt5AsyncClient mqttClient) {
+        try {
+            @SuppressWarnings("unchecked")
+            Mqtt5ConnectBuilder.Send<CompletableFuture<Mqtt5ConnAck>> 
connectBuilder =
+                    mock(Mqtt5ConnectBuilder.Send.class, Mockito.RETURNS_SELF);
+            when(mqttClient.connectWith()).thenReturn(connectBuilder);
+            when(connectBuilder.send())
+                    .thenReturn(CompletableFuture.completedFuture(null));
+
+            MqttSink sink = Mockito.spy(new MqttSink());
+            doReturn(mqttClient).when(sink).buildClient(any());
+            sink.open(baseConfigMap(), mock(SinkContext.class));
+            return sink;
+        } catch (Exception e) {
+            throw new AssertionError("Failed to initialize MqttSink test 
fixture", e);
+        }
+    }
+
+    private static Map<String, Object> baseConfigMap() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("serverHost", "localhost");
+        config.put("serverPort", 1883);
+        config.put("topic", "test/topic");
+        config.put("qos", 1);
+        config.put("connectionTimeoutMs", 1000L);
+        config.put("keepAliveIntervalSec", 60);
+        config.put("cleanStart", true);
+        return config;
+    }
+
+    private static final class TestRecord implements Record<byte[]> {
+        private final byte[] value;
+        private final CountDownLatch ackLatch;
+        private final AtomicBoolean failCalled;
+
+        private TestRecord(byte[] value, CountDownLatch ackLatch, 
AtomicBoolean failCalled) {
+            this.value = value;
+            this.ackLatch = ackLatch;
+            this.failCalled = failCalled;
+        }
+
+        @Override
+        public byte[] getValue() {
+            return value;
+        }
+
+        @Override
+        public void ack() {
+            ackLatch.countDown();
+        }
+
+        @Override
+        public void fail() {
+            failCalled.set(true);
+        }
+
+        private boolean isFailed() {
+            return failCalled.get();
+        }
+    }
+}
diff --git a/mqtt/src/test/resources/sinkConfig.yaml 
b/mqtt/src/test/resources/sinkConfig.yaml
new file mode 100644
index 00000000..69eb3f7e
--- /dev/null
+++ b/mqtt/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+serverHost: localhost
+serverPort: 1883
+topic: test/topic
+clientId: pulsar-mqtt-test
+username: mqtt-user
+password: mqtt-password
+qos: 1
+keepAliveIntervalSec: 45
+connectionTimeoutMs: 15000
+cleanStart: true
+sslEnabled: false
diff --git a/settings.gradle.kts b/settings.gradle.kts
index d186f01b..cdef2105 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -86,6 +86,7 @@ include("nsq")
 include("rabbitmq")
 include("redis")
 include("solr")
+include("mqtt")
 
 // JDBC — parent + sub-modules with qualified names to avoid clashes with 
debezium
 include("jdbc")


Reply via email to