This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fe82c3db7d9 Enable the kernel to support passing topics to MQTT custom
plugins (#15523)
fe82c3db7d9 is described below
commit fe82c3db7d9e135553cdd27ea1bc318b8bf53800
Author: wenyanshi-123 <[email protected]>
AuthorDate: Tue May 20 10:23:41 2025 +0800
Enable the kernel to support passing topics to MQTT custom plugins (#15523)
---
.../iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java | 9 ++++++++-
.../iotdb/db/protocol/mqtt/JSONPayloadFormatter.java | 9 ++++++++-
.../iotdb/db/protocol/mqtt/LinePayloadFormatter.java | 14 +++++++++++++-
.../apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java | 11 ++---------
.../apache/iotdb/db/protocol/mqtt/PayloadFormatter.java | 12 ++++++++++++
.../iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java | 12 ++++++++----
.../iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java | 9 ++++++---
7 files changed, 57 insertions(+), 19 deletions(-)
diff --git
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
index fec5b97b6fb..2c56b233660 100644
---
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
+++
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.NotImplementedException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -33,7 +34,7 @@ import java.util.List;
public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
@Override
- public List<Message> format(ByteBuf payload) {
+ public List<Message> format(String topic, ByteBuf payload) {
// Suppose the payload is a json format
if (payload == null) {
return Collections.emptyList();
@@ -54,6 +55,12 @@ public class CustomizedJsonPayloadFormatter implements
PayloadFormatter {
return ret;
}
+ @Override
+ @Deprecated
+ public List<Message> format(ByteBuf payload) {
+ throw new NotImplementedException();
+ }
+
@Override
public String getName() {
// set the value of mqtt_payload_formatter in iotdb-common.properties as
the following string:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
index 258c98d6364..745fbd7215b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
@@ -27,6 +27,7 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.tsfile.enums.TSDataType;
import java.nio.charset.StandardCharsets;
@@ -50,7 +51,7 @@ public class JSONPayloadFormatter implements PayloadFormatter
{
private static final Gson GSON = new GsonBuilder().create();
@Override
- public List<Message> format(ByteBuf payload) {
+ public List<Message> format(String topic, ByteBuf payload) {
if (payload == null) {
return new ArrayList<>();
}
@@ -81,6 +82,12 @@ public class JSONPayloadFormatter implements
PayloadFormatter {
throw new JsonParseException("payload is invalidate");
}
+ @Override
+ @Deprecated
+ public List<Message> format(ByteBuf payload) {
+ throw new NotImplementedException();
+ }
+
private List<Message> formatJson(JsonObject jsonObject) {
TreeMessage message = new TreeMessage();
message.setDevice(jsonObject.get(JSON_KEY_DEVICE).getAsString());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
index 8e389f78760..63767a7a450 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.protocol.mqtt;
import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
@@ -63,7 +64,7 @@ public class LinePayloadFormatter implements PayloadFormatter
{
}
@Override
- public List<Message> format(ByteBuf payload) {
+ public List<Message> format(String topic, ByteBuf payload) {
List<Message> messages = new ArrayList<>();
if (payload == null) {
return messages;
@@ -71,6 +72,8 @@ public class LinePayloadFormatter implements PayloadFormatter
{
String txt = payload.toString(StandardCharsets.UTF_8);
String[] lines = txt.split(LINE_BREAK);
+ // '/' previously defined as a database name
+ String database = !topic.contains("/") ? topic : topic.substring(0,
topic.indexOf("/"));
for (String line : lines) {
if (line.trim().startsWith(WELL)) {
continue;
@@ -83,6 +86,9 @@ public class LinePayloadFormatter implements PayloadFormatter
{
continue;
}
+ // Parsing Database Name
+ message.setDatabase((database));
+
// Parsing Table Names
message.setTable(matcher.group(TABLE));
@@ -121,6 +127,12 @@ public class LinePayloadFormatter implements
PayloadFormatter {
return messages;
}
+ @Override
+ @Deprecated
+ public List<Message> format(ByteBuf payload) {
+ throw new NotImplementedException();
+ }
+
private boolean setTags(Matcher matcher, TableMessage message) {
List<String> tagKeys = new ArrayList<>();
List<Object> tagValues = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index 288efb85d6d..ad97dd310c3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -136,7 +136,7 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
topic,
payload);
- List<Message> messages = payloadFormat.format(payload);
+ List<Message> messages = payloadFormat.format(topic, payload);
if (messages == null) {
return;
}
@@ -146,14 +146,7 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
continue;
}
if (useTableInsert) {
- TableMessage tableMessage = (TableMessage) message;
- // '/' previously defined as a database name
- String database =
- !msg.getTopicName().contains("/")
- ? msg.getTopicName()
- : msg.getTopicName().substring(0,
msg.getTopicName().indexOf("/"));
- tableMessage.setDatabase(database);
- insertTable(tableMessage, session);
+ insertTable((TableMessage) message, session);
} else {
insertTree((TreeMessage) message, session);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
index 278d6eb3743..c86648ac161 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
@@ -40,8 +40,20 @@ public interface PayloadFormatter {
* @param payload
* @return
*/
+ @Deprecated
List<Message> format(ByteBuf payload);
+ /**
+ * format a payload of a topic to a list of messages
+ *
+ * @param topic
+ * @param payload
+ * @return
+ */
+ default List<Message> format(String topic, ByteBuf payload) {
+ return format(payload);
+ }
+
/**
* get the formatter name
*
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
index bc721406d9a..deecf607d81 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
@@ -38,9 +38,10 @@ public class JSONPayloadFormatterTest {
+ " }";
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+ String topic = "";
JSONPayloadFormatter formatter = new JSONPayloadFormatter();
- TreeMessage message = (TreeMessage) formatter.format(buf).get(0);
+ TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(0);
assertEquals("root.sg.d1", message.getDevice());
assertEquals(Long.valueOf(1586076045524L), message.getTimestamp());
@@ -59,9 +60,10 @@ public class JSONPayloadFormatterTest {
+ " }";
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+ String topic = "";
JSONPayloadFormatter formatter = new JSONPayloadFormatter();
- TreeMessage message = (TreeMessage) formatter.format(buf).get(1);
+ TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1);
assertEquals("root.sg.d1", message.getDevice());
assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
@@ -88,9 +90,10 @@ public class JSONPayloadFormatterTest {
+ "]";
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+ String topic = "";
JSONPayloadFormatter formatter = new JSONPayloadFormatter();
- TreeMessage message = (TreeMessage) formatter.format(buf).get(1);
+ TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1);
assertEquals("root.sg.d2", message.getDevice());
assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
@@ -117,9 +120,10 @@ public class JSONPayloadFormatterTest {
+ "]";
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+ String topic = "";
JSONPayloadFormatter formatter = new JSONPayloadFormatter();
- TreeMessage message = (TreeMessage) formatter.format(buf).get(3);
+ TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(3);
assertEquals("root.sg.d2", message.getDevice());
assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
index 5651ca49b97..7bf9bce0702 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
@@ -37,9 +37,10 @@ public class LinePayloadFormatterTest {
"test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2
field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f
1";
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+ String topic = "";
LinePayloadFormatter formatter = new LinePayloadFormatter();
- TableMessage message = (TableMessage) formatter.format(buf).get(0);
+ TableMessage message = (TableMessage) formatter.format(topic, buf).get(0);
assertEquals("test1", message.getTable());
assertEquals(Long.valueOf(1L), message.getTimestamp());
@@ -64,9 +65,10 @@ public class LinePayloadFormatterTest {
+ "test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4
field4=\"value4\",field5=10i,field6=10i32 2 ";
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+ String topic = "";
LinePayloadFormatter formatter = new LinePayloadFormatter();
- TableMessage message = (TableMessage) formatter.format(buf).get(1);
+ TableMessage message = (TableMessage) formatter.format(topic, buf).get(1);
assertEquals("test2", message.getTable());
assertEquals(Long.valueOf(2L), message.getTimestamp());
@@ -82,9 +84,10 @@ public class LinePayloadFormatterTest {
+ " # test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4
field4=\"value4\",field5=10i,field6=10i32 2 ";
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+ String topic = "";
LinePayloadFormatter formatter = new LinePayloadFormatter();
- List<Message> message = formatter.format(buf);
+ List<Message> message = formatter.format(topic, buf);
assertEquals(1, message.size());
}