This is an automated email from the ASF dual-hosted git repository.
david-streamlio pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new 05379359 [feat][io] Add MQTT sink connector (#22)
05379359 is described below
commit 05379359e2d339a30099039ceeb2f84f50863387
Author: Dream95 <[email protected]>
AuthorDate: Sun May 10 23:29:20 2026 +0800
[feat][io] Add MQTT sink connector (#22)
* [feat][io] Add MQTT sink connector
Signed-off-by: Dream95 <[email protected]>
* [fix][io] Stabilize MQTT sink tests and split integration coverage
Signed-off-by: Dream95 <[email protected]>
* [fix][io] Handle disconnect interruption and correct serverPort FieldDoc
Signed-off-by: Dream95 <[email protected]>
---------
Signed-off-by: Dream95 <[email protected]>
---
README.md | 1 +
distribution/io/build.gradle.kts | 1 +
gradle/libs.versions.toml | 2 +
mqtt/build.gradle.kts | 53 ++++++++
.../pulsar/io/mqtt/MqttSinkIntegrationTest.java | 150 +++++++++++++++++++++
.../java/org/apache/pulsar/io/mqtt/MqttSink.java | 136 +++++++++++++++++++
.../org/apache/pulsar/io/mqtt/MqttSinkConfig.java | 119 ++++++++++++++++
.../resources/META-INF/services/pulsar-io.yaml | 22 +++
.../apache/pulsar/io/mqtt/MqttSinkConfigTest.java | 133 ++++++++++++++++++
.../org/apache/pulsar/io/mqtt/MqttSinkTest.java | 129 ++++++++++++++++++
mqtt/src/test/resources/sinkConfig.yaml | 29 ++++
settings.gradle.kts | 1 +
12 files changed, 776 insertions(+)
diff --git a/README.md b/README.md
index ea0694f7..174bf57e 100644
--- a/README.md
+++ b/README.md
@@ -58,6 +58,7 @@ mounting them into the `apachepulsar/pulsar` Docker image.
| Kafka | Apache Kafka |
| Kinesis | Amazon Kinesis Data Streams |
| MongoDB | MongoDB |
+| MQTT | MQTT broker |
| Redis | Redis |
| Solr | Apache Solr |
diff --git a/distribution/io/build.gradle.kts b/distribution/io/build.gradle.kts
index 9aa7b814..66f3a6c7 100644
--- a/distribution/io/build.gradle.kts
+++ b/distribution/io/build.gradle.kts
@@ -65,6 +65,7 @@ dependencies {
connectorNars(project(":influxdb"))
connectorNars(project(":redis"))
connectorNars(project(":solr"))
+ connectorNars(project(":mqtt"))
connectorNars(project(":dynamodb"))
connectorNars(project(":alluxio"))
connectorNars(project(":azure-data-explorer"))
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index e397b65b..ca155662 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -176,6 +176,7 @@ aerospike-client = "4.5.0"
aws-sdk = "1.12.788"
aws-sdk2 = "2.32.28"
rabbitmq-client = "5.28.0"
+hivemq-mqtt-client = "1.3.13"
cassandra-driver = "3.11.2"
mongodb-driver = "5.4.0"
influxdb-client = "7.3.0"
@@ -494,6 +495,7 @@ solr-test-framework = { module =
"org.apache.solr:solr-test-framework", version.
solr-core = { module = "org.apache.solr:solr-core", version.ref = "solr" }
# Messaging
rabbitmq-amqp-client = { module = "com.rabbitmq:amqp-client", version.ref =
"rabbitmq-client" }
+hivemq-mqtt-client = { module = "com.hivemq:hivemq-mqtt-client", version.ref =
"hivemq-mqtt-client" }
nsq-j = { module = "com.sproutsocial:nsq-j", version.ref = "nsq-client" }
# Time series
influxdb-client-java = { module = "com.influxdb:influxdb-client-java",
version.ref = "influxdb-client" }
diff --git a/mqtt/build.gradle.kts b/mqtt/build.gradle.kts
new file mode 100644
index 00000000..671a8380
--- /dev/null
+++ b/mqtt/build.gradle.kts
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+ id("pulsar-connectors.java-conventions")
+ id("pulsar-connectors.nar-conventions")
+}
+
+val integrationTest by sourceSets.creating {
+ compileClasspath += sourceSets.main.get().output +
configurations.testCompileClasspath.get()
+ runtimeClasspath += output + sourceSets.main.get().output +
configurations.testRuntimeClasspath.get()
+ resources.srcDir(rootProject.file("gradle/test-resources"))
+}
+
+configurations[integrationTest.implementationConfigurationName].extendsFrom(configurations.testImplementation.get())
+configurations[integrationTest.runtimeOnlyConfigurationName].extendsFrom(configurations.testRuntimeOnly.get())
+
+dependencies {
+ implementation(libs.pulsar.io.common)
+ implementation(libs.pulsar.io.core)
+ implementation(libs.pulsar.functions.instance)
+ implementation(libs.jackson.databind)
+ implementation(libs.jackson.dataformat.yaml)
+ implementation(libs.commons.lang3)
+ implementation(libs.guava)
+ implementation(libs.hivemq.mqtt.client)
+
+ add(integrationTest.implementationConfigurationName, libs.testcontainers)
+}
+
+tasks.register<Test>("integrationTest") {
+ description = "Runs MQTT integration tests that require Docker."
+ group = LifecycleBasePlugin.VERIFICATION_GROUP
+ testClassesDirs = integrationTest.output.classesDirs
+ classpath = integrationTest.runtimeClasspath
+ shouldRunAfter("test")
+}
diff --git
a/mqtt/src/integrationTest/java/org/apache/pulsar/io/mqtt/MqttSinkIntegrationTest.java
b/mqtt/src/integrationTest/java/org/apache/pulsar/io/mqtt/MqttSinkIntegrationTest.java
new file mode 100644
index 00000000..85b34c4e
--- /dev/null
+++
b/mqtt/src/integrationTest/java/org/apache/pulsar/io/mqtt/MqttSinkIntegrationTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mqtt;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import com.hivemq.client.mqtt.MqttClient;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MqttSinkIntegrationTest {
+
+ private static final Logger log =
LoggerFactory.getLogger(MqttSinkIntegrationTest.class);
+
+ private static final int MQTT_PORT = 1883;
+ private static final String TEST_TOPIC = "pulsar/mqtt/e2e";
+ private static final DockerImageName MOSQUITTO_IMAGE =
DockerImageName.parse("eclipse-mosquitto:2");
+
+ private final GenericContainer<?> mqttContainer = new
GenericContainer<>(MOSQUITTO_IMAGE)
+ .withExposedPorts(MQTT_PORT);
+
+ @BeforeClass(alwaysRun = true)
+ public void beforeClass() {
+ mqttContainer.start();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void afterClass() {
+ mqttContainer.stop();
+ }
+
+ @Test
+ public void testWriteE2EWithMosquitto() throws Exception {
+ BlockingQueue<String> receivedPayloads = new LinkedBlockingQueue<>();
+ CountDownLatch ackLatch = new CountDownLatch(3);
+ AtomicBoolean failCalled = new AtomicBoolean(false);
+
+ Mqtt5AsyncClient subscriber = MqttClient.builder()
+ .useMqttVersion5()
+ .serverHost(mqttContainer.getHost())
+ .serverPort(mqttContainer.getMappedPort(MQTT_PORT))
+ .identifier("mqtt-sink-e2e-subscriber")
+ .buildAsync();
+
+ subscriber.connectWith()
+ .cleanStart(true)
+ .send()
+ .get(10, TimeUnit.SECONDS);
+ subscriber.subscribeWith()
+ .topicFilter(TEST_TOPIC)
+ .callback(publish -> receivedPayloads.add(
+ new String(publish.getPayloadAsBytes(),
StandardCharsets.UTF_8)))
+ .send()
+ .get(10, TimeUnit.SECONDS);
+
+ Map<String, Object> config = new HashMap<>();
+ config.put("serverHost", mqttContainer.getHost());
+ config.put("serverPort", mqttContainer.getMappedPort(MQTT_PORT));
+ config.put("topic", TEST_TOPIC);
+ config.put("qos", 1);
+ config.put("connectionTimeoutMs", 10000);
+ config.put("clientId", "mqtt-sink-e2e-publisher");
+
+ SinkContext sinkContext = mock(SinkContext.class);
+ try (MqttSink sink = new MqttSink()) {
+ sink.open(config, sinkContext);
+
+ for (int i = 0; i < 3; i++) {
+ sink.write(new TestRecord(("msg-" +
i).getBytes(StandardCharsets.UTF_8), ackLatch, failCalled));
+ }
+
+ assertTrue(ackLatch.await(10, TimeUnit.SECONDS), "Timed out
waiting for record.ack()");
+ assertFalse(failCalled.get(), "record.fail() should not be called
on successful publish");
+
+ assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-0");
+ assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-1");
+ assertEquals(receivedPayloads.poll(10, TimeUnit.SECONDS), "msg-2");
+ } finally {
+ try {
+ subscriber.disconnectWith()
+ .sessionExpiryInterval(0)
+ .send()
+ .get(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.warn("Failed to disconnect MQTT subscriber in test
cleanup", e);
+ }
+ }
+ }
+
+ private static final class TestRecord implements Record<byte[]> {
+ private final byte[] value;
+ private final CountDownLatch ackLatch;
+ private final AtomicBoolean failCalled;
+
+ private TestRecord(byte[] value, CountDownLatch ackLatch,
AtomicBoolean failCalled) {
+ this.value = value;
+ this.ackLatch = ackLatch;
+ this.failCalled = failCalled;
+ }
+
+ @Override
+ public byte[] getValue() {
+ return value;
+ }
+
+ @Override
+ public void ack() {
+ ackLatch.countDown();
+ }
+
+ @Override
+ public void fail() {
+ failCalled.set(true);
+ }
+ }
+}
diff --git a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java
b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java
new file mode 100644
index 00000000..cf7eee40
--- /dev/null
+++ b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSink.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mqtt;
+
+import com.hivemq.client.mqtt.MqttClient;
+import com.hivemq.client.mqtt.datatypes.MqttQos;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+@Connector(
+ name = "mqtt",
+ type = IOType.SINK,
+ help = "A sink connector that moves messages from Pulsar to MQTT.",
+ configClass = MqttSinkConfig.class
+)
+@Slf4j
+public class MqttSink implements Sink<byte[]> {
+
+ private MqttSinkConfig mqttSinkConfig;
+ private Mqtt5AsyncClient mqttClient;
+ private MqttQos mqttQos;
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
+ mqttSinkConfig = MqttSinkConfig.load(config, sinkContext);
+ mqttSinkConfig.validate();
+ mqttQos = MqttQos.fromCode(mqttSinkConfig.getQos());
+
+ var builder = MqttClient.builder()
+ .useMqttVersion5()
+ .serverHost(mqttSinkConfig.getServerHost())
+ .serverPort(mqttSinkConfig.getServerPort());
+
+ if (StringUtils.isNotBlank(mqttSinkConfig.getClientId())) {
+ builder = builder.identifier(mqttSinkConfig.getClientId());
+ }
+ if (mqttSinkConfig.isSslEnabled()) {
+ builder = builder.sslWithDefaultConfig();
+ }
+
+ mqttClient = buildClient(builder);
+ if (StringUtils.isNotBlank(mqttSinkConfig.getUsername())) {
+ var authBuilder = mqttClient.connectWith()
+ .cleanStart(mqttSinkConfig.isCleanStart())
+ .keepAlive(mqttSinkConfig.getKeepAliveIntervalSec())
+ .simpleAuth()
+ .username(mqttSinkConfig.getUsername());
+ if (mqttSinkConfig.getPassword() != null) {
+ authBuilder =
authBuilder.password(mqttSinkConfig.getPassword().getBytes(StandardCharsets.UTF_8));
+ }
+ authBuilder.applySimpleAuth()
+ .send()
+ .get(mqttSinkConfig.getConnectionTimeoutMs(),
TimeUnit.MILLISECONDS);
+ } else {
+ mqttClient.connectWith()
+ .cleanStart(mqttSinkConfig.isCleanStart())
+ .keepAlive(mqttSinkConfig.getKeepAliveIntervalSec())
+ .send()
+ .get(mqttSinkConfig.getConnectionTimeoutMs(),
TimeUnit.MILLISECONDS);
+ }
+ log.info("MQTT sink connected to {}:{}.",
+ mqttSinkConfig.getServerHost(),
mqttSinkConfig.getServerPort());
+ }
+
+ Mqtt5AsyncClient buildClient(Mqtt5ClientBuilder builder) {
+ return builder.buildAsync();
+ }
+
+ @Override
+ public void write(Record<byte[]> record) {
+ try {
+ byte[] payload = record.getValue() == null ? new byte[0] :
record.getValue();
+ mqttClient.publishWith()
+ .topic(mqttSinkConfig.getTopic())
+ .qos(mqttQos)
+ .payload(payload)
+ .send()
+ .whenComplete((result, throwable) -> {
+ if (throwable == null) {
+ record.ack();
+ } else {
+ record.fail();
+ log.warn("Failed to publish message to MQTT topic
{}",
+ mqttSinkConfig.getTopic(), throwable);
+ }
+ });
+ } catch (Exception e) {
+ record.fail();
+ log.warn("Failed to schedule MQTT publish for topic {}",
mqttSinkConfig.getTopic(), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (mqttClient == null) {
+ return;
+ }
+
+ try {
+ mqttClient.disconnectWith()
+ .send()
+ .get(mqttSinkConfig.getConnectionTimeoutMs(),
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("MQTT disconnect was interrupted", e);
+ } catch (Exception e) {
+ log.warn("Failed to disconnect MQTT client cleanly", e);
+ }
+ }
+}
diff --git a/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java
b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java
new file mode 100644
index 00000000..646462d0
--- /dev/null
+++ b/mqtt/src/main/java/org/apache/pulsar/io/mqtt/MqttSinkConfig.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mqtt;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import lombok.Data;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+@Data
+@Accessors(chain = true)
+public class MqttSinkConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The MQTT broker host.")
+ private String serverHost;
+
+ @FieldDoc(
+ defaultValue = "1883",
+ help = "The MQTT broker port.")
+ private int serverPort = 1883;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The MQTT topic to publish messages to.")
+ private String topic;
+
+ @FieldDoc(
+ defaultValue = "",
+ help = "MQTT client id used for the broker connection.")
+ private String clientId;
+
+ @FieldDoc(
+ defaultValue = "",
+ sensitive = true,
+ help = "MQTT username.")
+ private String username;
+
+ @FieldDoc(
+ defaultValue = "",
+ sensitive = true,
+ help = "MQTT password.")
+ private String password;
+
+ @FieldDoc(
+ defaultValue = "0",
+ help = "MQTT QoS level for outgoing messages. Valid values: 0, 1,
2.")
+ private int qos = 0;
+
+ @FieldDoc(
+ defaultValue = "60",
+ help = "MQTT keep alive interval in seconds.")
+ private int keepAliveIntervalSec = 60;
+
+ @FieldDoc(
+ defaultValue = "10000",
+ help = "Timeout in milliseconds for MQTT connect/disconnect
operations.")
+ private long connectionTimeoutMs = 10000L;
+
+ @FieldDoc(
+ defaultValue = "true",
+ help = "Whether to start with a clean session.")
+ private boolean cleanStart = true;
+
+ @FieldDoc(
+ defaultValue = "false",
+ help = "Enable SSL/TLS with the client default SSL configuration.")
+ private boolean sslEnabled = false;
+
+ public static MqttSinkConfig load(String yamlFile) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return mapper.readValue(new File(yamlFile), MqttSinkConfig.class);
+ }
+
+ public static MqttSinkConfig load(Map<String, Object> map, SinkContext
sinkContext) throws IOException {
+ return IOConfigUtils.loadWithSecrets(map, MqttSinkConfig.class,
sinkContext);
+ }
+
+ public void validate() {
+ Preconditions.checkArgument(StringUtils.isNotBlank(serverHost),
"serverHost cannot be blank");
+ Preconditions.checkArgument(serverPort > 0, "serverPort must be a
positive integer");
+ Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic
cannot be blank");
+ Preconditions.checkArgument(StringUtils.isNotBlank(username) ||
StringUtils.isBlank(password),
+ "password cannot be set when username is blank");
+ Preconditions.checkArgument(qos >= 0 && qos <= 2, "qos must be one of
0, 1, 2");
+ Preconditions.checkArgument(keepAliveIntervalSec >= 0,
"keepAliveIntervalSec must be >= 0");
+ Preconditions.checkArgument(connectionTimeoutMs > 0,
"connectionTimeoutMs must be > 0");
+ }
+}
diff --git a/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml
b/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 00000000..b93d81af
--- /dev/null
+++ b/mqtt/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+name: mqtt
+description: MQTT sink connector
+sinkClass: org.apache.pulsar.io.mqtt.MqttSink
+sinkConfigClass: org.apache.pulsar.io.mqtt.MqttSinkConfig
diff --git
a/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java
b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java
new file mode 100644
index 00000000..0707f0a7
--- /dev/null
+++ b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkConfigTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mqtt;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+public class MqttSinkConfigTest {
+
+ @Test
+ public void loadFromYamlFileTest() throws Exception {
+ File yamlFile = getFile("sinkConfig.yaml");
+ MqttSinkConfig config =
MqttSinkConfig.load(yamlFile.getAbsolutePath());
+ assertNotNull(config);
+ assertEquals(config.getServerHost(), "localhost");
+ assertEquals(config.getServerPort(), 1883);
+ assertEquals(config.getTopic(), "test/topic");
+ assertEquals(config.getClientId(), "pulsar-mqtt-test");
+ assertEquals(config.getUsername(), "mqtt-user");
+ assertEquals(config.getPassword(), "mqtt-password");
+ assertEquals(config.getQos(), 1);
+ assertEquals(config.getKeepAliveIntervalSec(), 45);
+ assertEquals(config.getConnectionTimeoutMs(), 15000L);
+ assertTrue(config.isCleanStart());
+ assertFalse(config.isSslEnabled());
+ }
+
+ @Test
+ public void loadFromMapTest() throws IOException {
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ MqttSinkConfig config = MqttSinkConfig.load(baseConfigMap(),
sinkContext);
+
+ assertNotNull(config);
+ assertEquals(config.getServerHost(), "localhost");
+ assertEquals(config.getServerPort(), 1883);
+ assertEquals(config.getTopic(), "test/topic");
+ assertEquals(config.getClientId(), "pulsar-mqtt-test");
+ assertEquals(config.getQos(), 1);
+ }
+
+ @Test
+ public void loadFromMapCredentialsFromSecretTest() throws IOException {
+ Map<String, Object> map = baseConfigMap();
+ map.remove("username");
+ map.remove("password");
+
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+
Mockito.when(sinkContext.getSecret("username")).thenReturn("mqtt-user");
+
Mockito.when(sinkContext.getSecret("password")).thenReturn("mqtt-password");
+
+ MqttSinkConfig config = MqttSinkConfig.load(map, sinkContext);
+ assertEquals(config.getUsername(), "mqtt-user");
+ assertEquals(config.getPassword(), "mqtt-password");
+ }
+
+ @Test
+ public void validValidateTest() throws IOException {
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ MqttSinkConfig config = MqttSinkConfig.load(baseConfigMap(),
sinkContext);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "qos must be one of 0, 1, 2")
+ public void invalidQosValidateTest() throws IOException {
+ Map<String, Object> map = baseConfigMap();
+ map.put("qos", 3);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ MqttSinkConfig config = MqttSinkConfig.load(map, sinkContext);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "password cannot be set when
username is blank")
+ public void passwordWithoutUsernameValidateTest() throws IOException {
+ Map<String, Object> map = baseConfigMap();
+ map.put("username", "");
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ MqttSinkConfig config = MqttSinkConfig.load(map, sinkContext);
+ config.validate();
+ }
+
+ private static Map<String, Object> baseConfigMap() {
+ Map<String, Object> map = new HashMap<>();
+ map.put("serverHost", "localhost");
+ map.put("serverPort", 1883);
+ map.put("topic", "test/topic");
+ map.put("clientId", "pulsar-mqtt-test");
+ map.put("username", "mqtt-user");
+ map.put("password", "mqtt-password");
+ map.put("qos", 1);
+ map.put("keepAliveIntervalSec", 45);
+ map.put("connectionTimeoutMs", 15000);
+ map.put("cleanStart", true);
+ map.put("sslEnabled", false);
+ return map;
+ }
+
+ private File getFile(String name) throws URISyntaxException {
+ URL resource =
Objects.requireNonNull(getClass().getClassLoader().getResource(name),
+ "Missing test resource: " + name);
+ return Paths.get(resource.toURI()).toFile();
+ }
+}
diff --git a/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java
b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java
new file mode 100644
index 00000000..326d5a97
--- /dev/null
+++ b/mqtt/src/test/java/org/apache/pulsar/io/mqtt/MqttSinkTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mqtt;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertTrue;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
+import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+public class MqttSinkTest {
+
+ @Test
+ public void writeShouldCallFailWhenPublishThrowsSynchronously() {
+ Mqtt5AsyncClient mqttClient = mock(Mqtt5AsyncClient.class);
+ when(mqttClient.publishWith()).thenThrow(new RuntimeException("publish
failed"));
+ MqttSink sink = newSinkWithOpenedClient(mqttClient);
+ TestRecord record = new TestRecord("x".getBytes(), new
CountDownLatch(1), new AtomicBoolean(false));
+
+ sink.write(record);
+
+ assertTrue(record.isFailed(), "record.fail() should be called");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "password cannot be set when
username is blank")
+ public void openShouldPropagateConfigValidationFailure() throws Exception {
+ Map<String, Object> invalidConfig = baseConfigMap();
+ invalidConfig.put("username", "");
+ invalidConfig.put("password", "pwd");
+ try (MqttSink sink = new MqttSink()) {
+ sink.open(invalidConfig, mock(SinkContext.class));
+ }
+ }
+
+ @Test
+ public void closeShouldBeSafeWhenSinkWasNeverOpened() {
+ new MqttSink().close();
+ }
+
+ private MqttSink newSinkWithOpenedClient(Mqtt5AsyncClient mqttClient) {
+ try {
+ @SuppressWarnings("unchecked")
+ Mqtt5ConnectBuilder.Send<CompletableFuture<Mqtt5ConnAck>>
connectBuilder =
+ mock(Mqtt5ConnectBuilder.Send.class, Mockito.RETURNS_SELF);
+ when(mqttClient.connectWith()).thenReturn(connectBuilder);
+ when(connectBuilder.send())
+ .thenReturn(CompletableFuture.completedFuture(null));
+
+ MqttSink sink = Mockito.spy(new MqttSink());
+ doReturn(mqttClient).when(sink).buildClient(any());
+ sink.open(baseConfigMap(), mock(SinkContext.class));
+ return sink;
+ } catch (Exception e) {
+ throw new AssertionError("Failed to initialize MqttSink test
fixture", e);
+ }
+ }
+
+ private static Map<String, Object> baseConfigMap() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("serverHost", "localhost");
+ config.put("serverPort", 1883);
+ config.put("topic", "test/topic");
+ config.put("qos", 1);
+ config.put("connectionTimeoutMs", 1000L);
+ config.put("keepAliveIntervalSec", 60);
+ config.put("cleanStart", true);
+ return config;
+ }
+
+ private static final class TestRecord implements Record<byte[]> {
+ private final byte[] value;
+ private final CountDownLatch ackLatch;
+ private final AtomicBoolean failCalled;
+
+ private TestRecord(byte[] value, CountDownLatch ackLatch,
AtomicBoolean failCalled) {
+ this.value = value;
+ this.ackLatch = ackLatch;
+ this.failCalled = failCalled;
+ }
+
+ @Override
+ public byte[] getValue() {
+ return value;
+ }
+
+ @Override
+ public void ack() {
+ ackLatch.countDown();
+ }
+
+ @Override
+ public void fail() {
+ failCalled.set(true);
+ }
+
+ private boolean isFailed() {
+ return failCalled.get();
+ }
+ }
+}
diff --git a/mqtt/src/test/resources/sinkConfig.yaml
b/mqtt/src/test/resources/sinkConfig.yaml
new file mode 100644
index 00000000..69eb3f7e
--- /dev/null
+++ b/mqtt/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+serverHost: localhost
+serverPort: 1883
+topic: test/topic
+clientId: pulsar-mqtt-test
+username: mqtt-user
+password: mqtt-password
+qos: 1
+keepAliveIntervalSec: 45
+connectionTimeoutMs: 15000
+cleanStart: true
+sslEnabled: false
diff --git a/settings.gradle.kts b/settings.gradle.kts
index d186f01b..cdef2105 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -86,6 +86,7 @@ include("nsq")
include("rabbitmq")
include("redis")
include("solr")
+include("mqtt")
// JDBC — parent + sub-modules with qualified names to avoid clashes with
debezium
include("jdbc")