This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fe82c3db7d9 Enable the kernel to support passing topics to MQTT custom 
plugins (#15523)
fe82c3db7d9 is described below

commit fe82c3db7d9e135553cdd27ea1bc318b8bf53800
Author: wenyanshi-123 <[email protected]>
AuthorDate: Tue May 20 10:23:41 2025 +0800

    Enable the kernel to support passing topics to MQTT custom plugins (#15523)
---
 .../iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java  |  9 ++++++++-
 .../iotdb/db/protocol/mqtt/JSONPayloadFormatter.java       |  9 ++++++++-
 .../iotdb/db/protocol/mqtt/LinePayloadFormatter.java       | 14 +++++++++++++-
 .../apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java   | 11 ++---------
 .../apache/iotdb/db/protocol/mqtt/PayloadFormatter.java    | 12 ++++++++++++
 .../iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java   | 12 ++++++++----
 .../iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java   |  9 ++++++---
 7 files changed, 57 insertions(+), 19 deletions(-)

diff --git 
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
 
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
index fec5b97b6fb..2c56b233660 100644
--- 
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
+++ 
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
 import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.NotImplementedException;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -33,7 +34,7 @@ import java.util.List;
 public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
 
   @Override
-  public List<Message> format(ByteBuf payload) {
+  public List<Message> format(String topic, ByteBuf payload) {
     // Suppose the payload is a json format
     if (payload == null) {
       return Collections.emptyList();
@@ -54,6 +55,12 @@ public class CustomizedJsonPayloadFormatter implements 
PayloadFormatter {
     return ret;
   }
 
+  @Override
+  @Deprecated
+  public List<Message> format(ByteBuf payload) {
+    throw new NotImplementedException();
+  }
+
   @Override
   public String getName() {
     // set the value of mqtt_payload_formatter in iotdb-common.properties as 
the following string:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
index 258c98d6364..745fbd7215b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
@@ -27,6 +27,7 @@ import com.google.gson.JsonObject;
 import com.google.gson.JsonParseException;
 import com.google.gson.reflect.TypeToken;
 import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.tsfile.enums.TSDataType;
 
 import java.nio.charset.StandardCharsets;
@@ -50,7 +51,7 @@ public class JSONPayloadFormatter implements PayloadFormatter 
{
   private static final Gson GSON = new GsonBuilder().create();
 
   @Override
-  public List<Message> format(ByteBuf payload) {
+  public List<Message> format(String topic, ByteBuf payload) {
     if (payload == null) {
       return new ArrayList<>();
     }
@@ -81,6 +82,12 @@ public class JSONPayloadFormatter implements 
PayloadFormatter {
     throw new JsonParseException("payload is invalidate");
   }
 
+  @Override
+  @Deprecated
+  public List<Message> format(ByteBuf payload) {
+    throw new NotImplementedException();
+  }
+
   private List<Message> formatJson(JsonObject jsonObject) {
     TreeMessage message = new TreeMessage();
     message.setDevice(jsonObject.get(JSON_KEY_DEVICE).getAsString());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
index 8e389f78760..63767a7a450 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.protocol.mqtt;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.Pair;
@@ -63,7 +64,7 @@ public class LinePayloadFormatter implements PayloadFormatter 
{
   }
 
   @Override
-  public List<Message> format(ByteBuf payload) {
+  public List<Message> format(String topic, ByteBuf payload) {
     List<Message> messages = new ArrayList<>();
     if (payload == null) {
       return messages;
@@ -71,6 +72,8 @@ public class LinePayloadFormatter implements PayloadFormatter 
{
 
     String txt = payload.toString(StandardCharsets.UTF_8);
     String[] lines = txt.split(LINE_BREAK);
+    // '/' previously defined as a database name
+    String database = !topic.contains("/") ? topic : topic.substring(0, 
topic.indexOf("/"));
     for (String line : lines) {
       if (line.trim().startsWith(WELL)) {
         continue;
@@ -83,6 +86,9 @@ public class LinePayloadFormatter implements PayloadFormatter 
{
           continue;
         }
 
+        // Parsing Database Name
+        message.setDatabase((database));
+
         // Parsing Table Names
         message.setTable(matcher.group(TABLE));
 
@@ -121,6 +127,12 @@ public class LinePayloadFormatter implements 
PayloadFormatter {
     return messages;
   }
 
+  @Override
+  @Deprecated
+  public List<Message> format(ByteBuf payload) {
+    throw new NotImplementedException();
+  }
+
   private boolean setTags(Matcher matcher, TableMessage message) {
     List<String> tagKeys = new ArrayList<>();
     List<Object> tagValues = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index 288efb85d6d..ad97dd310c3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -136,7 +136,7 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
           topic,
           payload);
 
-      List<Message> messages = payloadFormat.format(payload);
+      List<Message> messages = payloadFormat.format(topic, payload);
       if (messages == null) {
         return;
       }
@@ -146,14 +146,7 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
           continue;
         }
         if (useTableInsert) {
-          TableMessage tableMessage = (TableMessage) message;
-          // '/' previously defined as a database name
-          String database =
-              !msg.getTopicName().contains("/")
-                  ? msg.getTopicName()
-                  : msg.getTopicName().substring(0, 
msg.getTopicName().indexOf("/"));
-          tableMessage.setDatabase(database);
-          insertTable(tableMessage, session);
+          insertTable((TableMessage) message, session);
         } else {
           insertTree((TreeMessage) message, session);
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
index 278d6eb3743..c86648ac161 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
@@ -40,8 +40,20 @@ public interface PayloadFormatter {
    * @param payload
    * @return
    */
+  @Deprecated
   List<Message> format(ByteBuf payload);
 
+  /**
+   * format a payload of a topic to a list of messages
+   *
+   * @param topic
+   * @param payload
+   * @return
+   */
+  default List<Message> format(String topic, ByteBuf payload) {
+    return format(payload);
+  }
+
   /**
    * get the formatter name
    *
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
index bc721406d9a..deecf607d81 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
@@ -38,9 +38,10 @@ public class JSONPayloadFormatterTest {
             + " }";
 
     ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+    String topic = "";
 
     JSONPayloadFormatter formatter = new JSONPayloadFormatter();
-    TreeMessage message = (TreeMessage) formatter.format(buf).get(0);
+    TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(0);
 
     assertEquals("root.sg.d1", message.getDevice());
     assertEquals(Long.valueOf(1586076045524L), message.getTimestamp());
@@ -59,9 +60,10 @@ public class JSONPayloadFormatterTest {
             + "  }";
 
     ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+    String topic = "";
 
     JSONPayloadFormatter formatter = new JSONPayloadFormatter();
-    TreeMessage message = (TreeMessage) formatter.format(buf).get(1);
+    TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1);
 
     assertEquals("root.sg.d1", message.getDevice());
     assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
@@ -88,9 +90,10 @@ public class JSONPayloadFormatterTest {
             + "]";
 
     ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+    String topic = "";
 
     JSONPayloadFormatter formatter = new JSONPayloadFormatter();
-    TreeMessage message = (TreeMessage) formatter.format(buf).get(1);
+    TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1);
 
     assertEquals("root.sg.d2", message.getDevice());
     assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
@@ -117,9 +120,10 @@ public class JSONPayloadFormatterTest {
             + "]";
 
     ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+    String topic = "";
 
     JSONPayloadFormatter formatter = new JSONPayloadFormatter();
-    TreeMessage message = (TreeMessage) formatter.format(buf).get(3);
+    TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(3);
 
     assertEquals("root.sg.d2", message.getDevice());
     assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
index 5651ca49b97..7bf9bce0702 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
@@ -37,9 +37,10 @@ public class LinePayloadFormatterTest {
         "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 
field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f
 1";
 
     ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+    String topic = "";
 
     LinePayloadFormatter formatter = new LinePayloadFormatter();
-    TableMessage message = (TableMessage) formatter.format(buf).get(0);
+    TableMessage message = (TableMessage) formatter.format(topic, buf).get(0);
 
     assertEquals("test1", message.getTable());
     assertEquals(Long.valueOf(1L), message.getTimestamp());
@@ -64,9 +65,10 @@ public class LinePayloadFormatterTest {
             + "test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 
field4=\"value4\",field5=10i,field6=10i32 2 ";
 
     ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+    String topic = "";
 
     LinePayloadFormatter formatter = new LinePayloadFormatter();
-    TableMessage message = (TableMessage) formatter.format(buf).get(1);
+    TableMessage message = (TableMessage) formatter.format(topic, buf).get(1);
 
     assertEquals("test2", message.getTable());
     assertEquals(Long.valueOf(2L), message.getTimestamp());
@@ -82,9 +84,10 @@ public class LinePayloadFormatterTest {
             + " # test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 
field4=\"value4\",field5=10i,field6=10i32 2 ";
 
     ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+    String topic = "";
 
     LinePayloadFormatter formatter = new LinePayloadFormatter();
-    List<Message> message = formatter.format(buf);
+    List<Message> message = formatter.format(topic, buf);
 
     assertEquals(1, message.size());
   }

Reply via email to