This is an automated email from the ASF dual-hosted git repository.

critas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b5e5b00e4c [To Master] MQTT Extensions to the Table Model Using the 
Row Protocol (#14848)
6b5e5b00e4c is described below

commit 6b5e5b00e4c1b73fc69fa5cbd6a693f36083fffa
Author: ppppoooo <[email protected]>
AuthorDate: Fri Feb 21 18:25:45 2025 +0800

    [To Master] MQTT Extensions to the Table Model Using the Row Protocol 
(#14848)
    
    * mqtt
    
    * It
    
    * it
    
    * youhua
    
    * exclude
    
    * it
    
    * Update pom.xml
    
    * dataPath
    
    * Update pom.xml
    
    * it
    
    * rmset
    
    * rename
    
    * rename
    
    * zhushi
    
    ---------
    
    Co-authored-by: xz m <[email protected]>
    Co-authored-by: CritasWang <[email protected]>
---
 .../server/CustomizedJsonPayloadFormatter.java     |   2 +-
 .../org/apache/iotdb/mqtt/server/MyMessage.java    |  31 +--
 .../java/org/apache/iotdb/mqtt/MQTTClient.java     |  39 +++-
 integration-test/pom.xml                           |   4 +
 .../iotdb/it/env/cluster/ClusterConstant.java      |   1 +
 .../it/env/cluster/config/MppCommonConfig.java     |   6 +
 .../it/env/cluster/config/MppDataNodeConfig.java   |  12 +
 .../env/cluster/config/MppSharedCommonConfig.java  |   7 +
 .../iotdb/it/env/cluster/node/DataNodeWrapper.java |   3 +
 .../it/env/remote/config/RemoteCommonConfig.java   |   5 +
 .../it/env/remote/config/RemoteDataNodeConfig.java |  10 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   2 +
 .../apache/iotdb/itbase/env/DataNodeConfig.java    |   4 +
 .../relational/it/mqtt/IoTDBMQTTServiceIT.java     | 132 +++++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  13 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   4 +
 .../db/protocol/mqtt/JSONPayloadFormatter.java     |   6 +-
 .../db/protocol/mqtt/LinePayloadFormatter.java     | 252 +++++++++++++++++++++
 .../iotdb/db/protocol/mqtt/MPPPublishHandler.java  | 239 ++++++++++++++-----
 .../org/apache/iotdb/db/protocol/mqtt/Message.java |  58 +----
 .../iotdb/db/protocol/mqtt/TableMessage.java       | 144 ++++++++++++
 .../mqtt/{Message.java => TreeMessage.java}        |  16 +-
 .../org/apache/iotdb/db/service/MQTTService.java   |   2 +
 ....apache.iotdb.db.protocol.mqtt.PayloadFormatter |   1 +
 .../db/protocol/mqtt/JSONPayloadFormatterTest.java |   8 +-
 .../db/protocol/mqtt/LinePayloadFormatterTest.java |  91 ++++++++
 .../db/protocol/mqtt/PayloadFormatManagerTest.java |   2 +-
 .../conf/iotdb-system.properties.template          |   3 +-
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   1 +
 29 files changed, 942 insertions(+), 156 deletions(-)

diff --git 
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
 
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
index 5b708f1bcd5..171074d62a6 100644
--- 
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
+++ 
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
@@ -43,7 +43,7 @@ public class CustomizedJsonPayloadFormatter implements 
PayloadFormatter {
     // this is just an example, so we just generate some Messages directly
     for (int i = 0; i < 2; i++) {
       long ts = i;
-      Message message = new Message();
+      MyMessage message = new MyMessage();
       message.setDevice("d" + i);
       message.setTimestamp(ts);
       message.setMeasurements(Arrays.asList("s1", "s2"));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
 
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/MyMessage.java
similarity index 68%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
copy to 
example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/MyMessage.java
index 5ce81e55c15..d0ab7e08b4e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
+++ 
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/MyMessage.java
@@ -7,25 +7,26 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
+package org.apache.iotdb.mqtt.server;
 
-package org.apache.iotdb.db.protocol.mqtt;
+import org.apache.iotdb.db.protocol.mqtt.Message;
 
 import org.apache.tsfile.enums.TSDataType;
 
 import java.util.List;
 
-/** Message describes the information sometime sent from the devices. */
-public class Message {
+public class MyMessage extends Message {
+
   private String device;
-  private Long timestamp;
   private List<String> measurements;
   private List<TSDataType> dataTypes;
   private List<String> values;
@@ -38,14 +39,6 @@ public class Message {
     this.device = device;
   }
 
-  public Long getTimestamp() {
-    return timestamp;
-  }
-
-  public void setTimestamp(Long timestamp) {
-    this.timestamp = timestamp;
-  }
-
   public List<String> getMeasurements() {
     return measurements;
   }
@@ -77,7 +70,7 @@ public class Message {
         + device
         + '\''
         + ", timestamp="
-        + timestamp
+        + super.timestamp
         + ", measurements="
         + measurements
         + ", values="
diff --git a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java 
b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
index e0efc473937..fad59224e6f 100644
--- a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
+++ b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
@@ -25,6 +25,9 @@ import org.fusesource.mqtt.client.QoS;
 import java.util.Random;
 
 public class MQTTClient {
+
+  private static final String DATABASE = "myMqttTest";
+
   public static void main(String[] args) throws Exception {
     MQTT mqtt = new MQTT();
     mqtt.setHost("127.0.0.1", 1883);
@@ -35,7 +38,14 @@ public class MQTTClient {
 
     BlockingConnection connection = mqtt.blockingConnection();
     connection.connect();
+    // the config mqttPayloadFormatter must be tree-json
+    // jsonPayloadFormatter(connection);
+    // the config mqttPayloadFormatter must be table-line
+    linePayloadFormatter(connection);
+    connection.disconnect();
+  }
 
+  private static void jsonPayloadFormatter(BlockingConnection connection) 
throws Exception {
     Random random = new Random();
     StringBuilder sb = new StringBuilder();
     for (int i = 0; i < 10; i++) {
@@ -58,7 +68,34 @@ public class MQTTClient {
     sb.insert(0, "[");
     sb.replace(sb.lastIndexOf(","), sb.length(), "]");
     connection.publish("root.sg.d1.s1", sb.toString().getBytes(), 
QoS.AT_LEAST_ONCE, false);
+  }
 
-    connection.disconnect();
+  // The database must be created in advance
+  private static void linePayloadFormatter(BlockingConnection connection) 
throws Exception {
+
+    String payload =
+        "test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4 
field1=\"fieldValue1\",field2=1i,field3=1u 1";
+    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+    Thread.sleep(10);
+
+    payload = "test1,tag1=t1,tag2=t2  field4=2,field5=2i32,field6=2f 2";
+    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+    Thread.sleep(10);
+
+    payload =
+        "test1,tag1=t1,tag2=t2  field7=t,field8=T,field9=true 3 \n "
+            + "test1,tag1=t1,tag2=t2  field7=f,field8=F,field9=FALSE 4";
+    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+    Thread.sleep(10);
+
+    payload =
+        "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 
field1=\"fieldValue1\",field2=1i,field3=1u 4 \n "
+            + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5";
+    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+    Thread.sleep(10);
+
+    payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2 
field4=2,field5=2i32,field6=2f 6";
+    connection.publish(DATABASE + "/myTopic", payload.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+    Thread.sleep(10);
   }
 }
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 6438c36e3d5..e2165fe611f 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -225,6 +225,10 @@
             <artifactId>jcip-annotations</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.fusesource.mqtt-client</groupId>
+            <artifactId>mqtt-client</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
index 2516bf66561..3162f47ebe8 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
@@ -164,6 +164,7 @@ public class ClusterConstant {
 
   public static final String MQTT_HOST = "mqtt_host";
   public static final String MQTT_PORT = "mqtt_port";
+  public static final String MQTT_DATA_PATH = "mqtt_data_path";
   public static final String UDF_LIB_DIR = "udf_lib_dir";
   public static final String TRIGGER_LIB_DIR = "trigger_lib_dir";
   public static final String PIPE_LIB_DIR = "pipe_lib_dir";
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index af1771c1be6..8e8fd199eda 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -321,6 +321,12 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
+    setProperty("mqtt_payload_formatter", 
String.valueOf(mqttPayloadFormatter));
+    return this;
+  }
+
   @Override
   public CommonConfig setSchemaEngineMode(String schemaEngineMode) {
     setProperty("schema_engine_mode", schemaEngineMode);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 3d6675c2f46..19dada3d06a 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -83,4 +83,16 @@ public class MppDataNodeConfig extends MppBaseConfig 
implements DataNodeConfig {
         "compaction_schedule_interval_in_ms", 
String.valueOf(compactionScheduleInterval));
     return this;
   }
+
+  @Override
+  public DataNodeConfig setEnableMQTTService(boolean enableMQTTService) {
+    setProperty("enable_mqtt_service", String.valueOf(enableMQTTService));
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
+    setProperty("mqtt_payload_formatter", 
String.valueOf(mqttPayloadFormatter));
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index ddfdaa7d698..24c5a3d5b4d 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -323,6 +323,13 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     return this;
   }
 
+  @Override
+  public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
+    cnConfig.setMqttPayloadFormatter(mqttPayloadFormatter);
+    dnConfig.setMqttPayloadFormatter(mqttPayloadFormatter);
+    return this;
+  }
+
   @Override
   public CommonConfig setSchemaEngineMode(String schemaEngineMode) {
     cnConfig.setSchemaEngineMode(schemaEngineMode);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
index 76c81ffb3bb..1fe2d264269 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
@@ -58,6 +58,7 @@ import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.DN_WAL_DIRS;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.IOTDB_SYSTEM_PROPERTIES_FILE;
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.MAIN_CLASS_NAME;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.MAX_TSBLOCK_SIZE_IN_BYTES;
+import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_DATA_PATH;
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_HOST;
 import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_PORT;
 import static 
org.apache.iotdb.it.env.cluster.ClusterConstant.PAGE_SIZE_IN_BYTE;
@@ -115,6 +116,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
     // Override mqtt properties of super class
     immutableCommonProperties.setProperty(MQTT_HOST, super.getIp());
     immutableCommonProperties.setProperty(MQTT_PORT, 
String.valueOf(this.mqttPort));
+    immutableCommonProperties.setProperty(
+        MQTT_DATA_PATH, getNodePath() + File.separator + "mqttData");
     immutableCommonProperties.setProperty(
         PIPE_AIR_GAP_RECEIVER_PORT, 
String.valueOf(this.pipeAirGapReceiverPort));
 
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index c97b3df98ca..300c2b109cd 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -226,6 +226,11 @@ public class RemoteCommonConfig implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
+    return this;
+  }
+
   @Override
   public CommonConfig setSchemaEngineMode(String schemaEngineMode) {
     return this;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index 7ef4d9dd8cb..b109baa8203 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -48,4 +48,14 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
   public DataNodeConfig setCompactionScheduleInterval(long 
compactionScheduleInterval) {
     return this;
   }
+
+  @Override
+  public DataNodeConfig setEnableMQTTService(boolean enableMQTTService) {
+    return this;
+  }
+
+  @Override
+  public DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 0b604bf97a9..0f547443d77 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -104,6 +104,8 @@ public interface CommonConfig {
 
   CommonConfig setEnableMQTTService(boolean enableMQTTService);
 
+  CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter);
+
   CommonConfig setSchemaEngineMode(String schemaEngineMode);
 
   CommonConfig setSelectIntoInsertTabletPlanRowLimit(int 
selectIntoInsertTabletPlanRowLimit);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index ad68ae74a86..b8ad5c2f15b 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -33,4 +33,8 @@ public interface DataNodeConfig {
       long loadTsFileAnalyzeSchemaMemorySizeInBytes);
 
   DataNodeConfig setCompactionScheduleInterval(long 
compactionScheduleInterval);
+
+  DataNodeConfig setEnableMQTTService(boolean enableMQTTService);
+
+  DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
new file mode 100644
index 00000000000..fcf8af41327
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.relational.it.mqtt;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import org.apache.tsfile.read.common.Field;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBMQTTServiceIT {
+  private BlockingConnection connection;
+  private static final String IP = System.getProperty("RemoteIp", "127.0.0.1");
+  private static final String USER = System.getProperty("RemoteUser", "root");
+  private static final String PASSWORD = System.getProperty("RemotePassword", 
"root");
+  private static final String DATABASE = "mqtttest";
+  public static final String FORMATTER = "table-line";
+
+  @Before
+  public void setUp() throws Exception {
+    BaseEnv baseEnv = EnvFactory.getEnv();
+    baseEnv.getConfig().getDataNodeConfig().setEnableMQTTService(true);
+    baseEnv.getConfig().getDataNodeConfig().setMqttPayloadFormatter(FORMATTER);
+    baseEnv.initClusterEnvironment();
+    DataNodeWrapper portConflictDataNodeWrapper = 
EnvFactory.getEnv().getDataNodeWrapper(0);
+    int port = portConflictDataNodeWrapper.getMqttPort();
+    MQTT mqtt = new MQTT();
+    mqtt.setHost(IP, port);
+    mqtt.setUserName(USER);
+    mqtt.setPassword(PASSWORD);
+    mqtt.setConnectAttemptsMax(3);
+    mqtt.setReconnectDelay(10);
+
+    connection = mqtt.blockingConnection();
+    connection.connect();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      if (connection != null) {
+        connection.disconnect();
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testNoAttr() throws Exception {
+    try (final ITableSession session =
+        EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE)) {
+      session.executeNonQueryStatement("CREATE DATABASE " + DATABASE);
+      String payload1 = "test1,tag1=t1,tag2=t2 field1=1,field2=1f,field3=1i32 
1";
+      connection.publish(DATABASE + "/myTopic", payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+      Thread.sleep(1000);
+      try (final SessionDataSet dataSet =
+          session.executeQueryStatement(
+              "select tag1,tag2,field1,field2,field3 from test1 where time = 
1")) {
+        assertEquals(5, dataSet.getColumnNames().size());
+        List<Field> fields = dataSet.next().getFields();
+        assertEquals("t1", fields.get(0).getStringValue());
+        assertEquals("t2", fields.get(1).getStringValue());
+        assertEquals(1d, fields.get(2).getDoubleV(), 0);
+        assertEquals(1f, fields.get(3).getFloatV(), 0);
+        assertEquals(1, fields.get(4).getIntV(), 0);
+      }
+    }
+  }
+
+  @Test
+  public void testWithAttr() throws Exception {
+    try (final ITableSession session =
+        EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE)) {
+      session.executeNonQueryStatement("CREATE DATABASE " + DATABASE);
+      String payload1 = "test2,tag1=t1,tag2=t2 attr3=a3,attr4=a4 
field1=1,field2=1f,field3=1i32 1";
+      connection.publish(DATABASE + "/myTopic", payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+      Thread.sleep(1000);
+      try (final SessionDataSet dataSet =
+          session.executeQueryStatement(
+              "select tag1,tag2,attr3,attr4,field1,field2,field3 from test2 
where time = 1")) {
+        assertEquals(7, dataSet.getColumnNames().size());
+        List<Field> fields = dataSet.next().getFields();
+        assertEquals("t1", fields.get(0).getStringValue());
+        assertEquals("t2", fields.get(1).getStringValue());
+        assertEquals("a3", fields.get(2).getStringValue());
+        assertEquals("a4", fields.get(3).getStringValue());
+        assertEquals(1d, fields.get(4).getDoubleV(), 0);
+        assertEquals(1f, fields.get(5).getFloatV(), 0);
+        assertEquals(1, fields.get(6).getIntV(), 0);
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c8a904909d4..2ff675b0196 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -113,7 +113,10 @@ public class IoTDBConfig {
   private int mqttHandlerPoolSize = 1;
 
   /** The mqtt message payload formatter. */
-  private String mqttPayloadFormatter = "json";
+  private String mqttPayloadFormatter = "tree-json";
+
+  /** The mqtt save data path */
+  private String mqttDataPath = "data/";
 
   /** Max mqtt message size. Unit: byte */
   private int mqttMaxMessageSize = 1048576;
@@ -2726,6 +2729,14 @@ public class IoTDBConfig {
     this.mqttPayloadFormatter = mqttPayloadFormatter;
   }
 
+  public String getMqttDataPath() {
+    return mqttDataPath;
+  }
+
+  public void setMqttDataPath(String mqttDataPath) {
+    this.mqttDataPath = mqttDataPath;
+  }
+
   public int getMqttMaxMessageSize() {
     return mqttMaxMessageSize;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5d3b9fd5131..8184435eadd 100755
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1854,6 +1854,10 @@ public class IoTDBDescriptor {
           
properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME).trim());
     }
 
+    if (properties.getProperty(IoTDBConstant.MQTT_DATA_PATH) != null) {
+      
conf.setMqttDataPath(properties.getProperty(IoTDBConstant.MQTT_DATA_PATH).trim());
+    }
+
     if (properties.getProperty(IoTDBConstant.ENABLE_MQTT) != null) {
       conf.setEnableMQTTService(
           
Boolean.parseBoolean(properties.getProperty(IoTDBConstant.ENABLE_MQTT).trim()));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
index 524b0a4718d..9d0af6af6c5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
@@ -80,7 +80,7 @@ public class JSONPayloadFormatter implements PayloadFormatter 
{
   }
 
   private List<Message> formatJson(JsonObject jsonObject) {
-    Message message = new Message();
+    TreeMessage message = new TreeMessage();
     message.setDevice(jsonObject.get(JSON_KEY_DEVICE).getAsString());
     message.setTimestamp(jsonObject.get(JSON_KEY_TIMESTAMP).getAsLong());
     message.setMeasurements(
@@ -106,7 +106,7 @@ public class JSONPayloadFormatter implements 
PayloadFormatter {
 
     List<Message> ret = new ArrayList<>(timestamps.size());
     for (int i = 0; i < timestamps.size(); i++) {
-      Message message = new Message();
+      TreeMessage message = new TreeMessage();
       message.setDevice(device);
       message.setTimestamp(timestamps.get(i));
       message.setMeasurements(measurements);
@@ -118,6 +118,6 @@ public class JSONPayloadFormatter implements 
PayloadFormatter {
 
   @Override
   public String getName() {
-    return "json";
+    return "tree-json";
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
new file mode 100644
index 00000000000..62481e0fcb0
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.protocol.mqtt;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * The Line payload formatter. myTable,tag1=value1,tag2=value2 
attr1=value1,attr2=value2
+ * fieldKey="fieldValue" 1740109006000 \n myTable,tag1=value1,tag2=value2 
fieldKey="fieldValue"
+ * 1740109006000
+ */
+public class LinePayloadFormatter implements PayloadFormatter {
+
+  private static final Logger log = 
LoggerFactory.getLogger(LinePayloadFormatter.class);
+
+  /*
+  Regular expression matching line protocol ,the attributes field is not 
required
+  */
+  private static final String REGEX =
+      "(?<table>\\w+)(,(?<tags>[^ ]+))?(\\s+(?<attributes>[^ 
]+))?\\s+(?<fields>[^ ]+)\\s+(?<timestamp>\\d+)";
+  private static final String COMMA = ",";
+  private static final String WELL = "#";
+  private static final String LINE_BREAK = "\n";
+  private static final String EQUAL = "=";
+  private static final String TABLE = "table";
+  private static final String TAGS = "tags";
+  private static final String ATTRIBUTES = "attributes";
+  private static final String FIELDS = "fields";
+  private static final String TIMESTAMP = "timestamp";
+  private static final String NULL = "null";
+
+  private final Pattern pattern;
+
+  public LinePayloadFormatter() {
+    pattern = Pattern.compile(REGEX);
+  }
+
+  @Override
+  public List<Message> format(ByteBuf payload) {
+    List<Message> messages = new ArrayList<>();
+    if (payload == null) {
+      return messages;
+    }
+
+    String txt = payload.toString(StandardCharsets.UTF_8);
+    String[] lines = txt.split(LINE_BREAK);
+    for (String line : lines) {
+      if (line.trim().startsWith(WELL)) {
+        continue;
+      }
+      TableMessage message = new TableMessage();
+      try {
+        Matcher matcher = pattern.matcher(line.trim());
+        if (!matcher.matches()) {
+          log.warn("Invalid line protocol format ,line is {}", line);
+          continue;
+        }
+
+        // Parsing Table Names
+        message.setTable(matcher.group(TABLE));
+
+        // Parsing Tags
+        if (!setTags(matcher, message)) {
+          log.warn("The tags is error , line is {}", line);
+          continue;
+        }
+
+        // Parsing Attributes
+        if (!setAttributes(matcher, message)) {
+          log.warn("The attributes is error , line is {}", line);
+          continue;
+        }
+
+        // Parsing Fields
+        if (!setFields(matcher, message)) {
+          log.warn("The fields is error , line is {}", line);
+          continue;
+        }
+
+        // Parsing timestamp
+        if (!setTimestamp(matcher, message)) {
+          log.warn("The timestamp is error , line is {}", line);
+          continue;
+        }
+
+        messages.add(message);
+      } catch (Exception e) {
+        log.warn(
+            "The line pattern parsing fails, and the failed line message is {} 
,exception is",
+            line,
+            e);
+      }
+    }
+    return messages;
+  }
+
+  private boolean setTags(Matcher matcher, TableMessage message) {
+    List<String> tagKeys = new ArrayList<>();
+    List<Object> tagValues = new ArrayList<>();
+    String tagsGroup = matcher.group(TAGS);
+    if (tagsGroup != null && !tagsGroup.isEmpty()) {
+      String[] tagPairs = tagsGroup.split(COMMA);
+      for (String tagPair : tagPairs) {
+        if (!tagPair.isEmpty()) {
+          String[] keyValue = tagPair.split(EQUAL);
+          if (keyValue.length == 2 && !NULL.equals(keyValue[1])) {
+            tagKeys.add(keyValue[0]);
+            tagValues.add(new Binary[] {new 
Binary(keyValue[1].getBytes(StandardCharsets.UTF_8))});
+          }
+        }
+      }
+    }
+    if (!tagKeys.isEmpty() && !tagValues.isEmpty() && tagKeys.size() == 
tagValues.size()) {
+      message.setTagKeys(tagKeys);
+      message.setTagValues(tagValues);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private boolean setAttributes(Matcher matcher, TableMessage message) {
+    List<String> attributeKeys = new ArrayList<>();
+    List<Object> attributeValues = new ArrayList<>();
+    String attributesGroup = matcher.group(ATTRIBUTES);
+    if (attributesGroup != null && !attributesGroup.isEmpty()) {
+      String[] attributePairs = attributesGroup.split(COMMA);
+      for (String attributePair : attributePairs) {
+        if (!attributePair.isEmpty()) {
+          String[] keyValue = attributePair.split(EQUAL);
+          if (keyValue.length == 2 && !NULL.equals(keyValue[1])) {
+            attributeKeys.add(keyValue[0]);
+            attributeValues.add(
+                new Binary[] {new 
Binary(keyValue[1].getBytes(StandardCharsets.UTF_8))});
+          }
+        }
+      }
+    }
+    if (attributeKeys.size() == attributeValues.size()) {
+      message.setAttributeKeys(attributeKeys);
+      message.setAttributeValues(attributeValues);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private boolean setFields(Matcher matcher, TableMessage message) {
+    List<String> fields = new ArrayList<>();
+    List<TSDataType> dataTypes = new ArrayList<>();
+    List<Object> values = new ArrayList<>();
+    String fieldsGroup = matcher.group(FIELDS);
+    if (fieldsGroup != null && !fieldsGroup.isEmpty()) {
+      String[] fieldPairs = fieldsGroup.split(COMMA);
+      for (String fieldPair : fieldPairs) {
+        if (!fieldPair.isEmpty()) {
+          String[] keyValue = fieldPair.split(EQUAL);
+          if (keyValue.length == 2 && !NULL.equals(keyValue[1])) {
+            fields.add(keyValue[0]);
+            Pair<TSDataType, Object> typeAndValue = analyticValue(keyValue[1]);
+            values.add(typeAndValue.getRight());
+            dataTypes.add(typeAndValue.getLeft());
+          }
+        }
+      }
+    }
+    if (!fields.isEmpty() && !values.isEmpty() && fields.size() == 
values.size()) {
+      message.setFields(fields);
+      message.setDataTypes(dataTypes);
+      message.setValues(values);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private Pair<TSDataType, Object> analyticValue(String value) {
+    if (value.startsWith("\"") && value.endsWith("\"")) {
+      // String
+      return new Pair<>(
+          TSDataType.TEXT,
+          new Binary[] {
+            new Binary(value.substring(1, value.length() - 
1).getBytes(StandardCharsets.UTF_8))
+          });
+    } else if (value.equalsIgnoreCase("t")
+        || value.equalsIgnoreCase("true")
+        || value.equalsIgnoreCase("f")
+        || value.equalsIgnoreCase("false")) {
+      // boolean
+      return new Pair<>(
+          TSDataType.BOOLEAN,
+          new boolean[] {value.equalsIgnoreCase("t") || 
value.equalsIgnoreCase("true")});
+    } else if (value.endsWith("f")) {
+      // float
+      return new Pair<>(
+          TSDataType.FLOAT, new float[] {Float.parseFloat(value.substring(0, 
value.length() - 1))});
+    } else if (value.endsWith("i32")) {
+      // int
+      return new Pair<>(
+          TSDataType.INT32, new int[] {Integer.parseInt(value.substring(0, 
value.length() - 3))});
+    } else if (value.endsWith("u") || value.endsWith("i")) {
+      // long
+      return new Pair<>(
+          TSDataType.INT64, new long[] {Long.parseLong(value.substring(0, 
value.length() - 1))});
+    } else {
+      // double
+      return new Pair<>(TSDataType.DOUBLE, new double[] 
{Double.parseDouble(value)});
+    }
+  }
+
+  private boolean setTimestamp(Matcher matcher, TableMessage message) {
+    String timestampGroup = matcher.group(TIMESTAMP);
+    if (timestampGroup != null && !timestampGroup.isEmpty()) {
+      message.setTimestamp(Long.parseLong(timestampGroup));
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "table-line";
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index 772f3672a14..b7311f0c0a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -20,9 +20,12 @@ package org.apache.iotdb.db.protocol.mqtt;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.IClientSession;
 import org.apache.iotdb.db.protocol.session.MqttClientSession;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
@@ -32,7 +35,11 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDeviceP
 import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -45,12 +52,15 @@ import 
io.moquette.interception.messages.InterceptPublishMessage;
 import io.netty.buffer.ByteBuf;
 import io.netty.handler.codec.mqtt.MqttQoS;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.BitMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.ZoneId;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /** PublishHandler handle the messages from MQTT clients. */
 public class MPPPublishHandler extends AbstractInterceptHandler {
@@ -65,11 +75,13 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
   private final PayloadFormatter payloadFormat;
   private final IPartitionFetcher partitionFetcher;
   private final ISchemaFetcher schemaFetcher;
+  private final boolean useTableInsert;
 
   public MPPPublishHandler(IoTDBConfig config) {
     this.payloadFormat = 
PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
     partitionFetcher = ClusterPartitionFetcher.getInstance();
     schemaFetcher = ClusterSchemaFetcher.getInstance();
+    useTableInsert = (payloadFormat instanceof LinePayloadFormatter);
   }
 
   @Override
@@ -87,7 +99,9 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
           new String(msg.getPassword()),
           ZoneId.systemDefault().toString(),
           TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3,
-          ClientVersion.V_1_0);
+          ClientVersion.V_1_0,
+          useTableInsert ? IClientSession.SqlDialect.TABLE : 
IClientSession.SqlDialect.TREE);
+      sessionManager.registerSession(session);
       clientIdToSessionMap.put(msg.getClientID(), session);
     }
   }
@@ -96,6 +110,7 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
   public void onDisconnect(InterceptDisconnectMessage msg) {
     MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
     if (null != session) {
+      sessionManager.removeCurrSession();
       sessionManager.closeSession(session, 
Coordinator.getInstance()::cleanupQueryExecution);
     }
   }
@@ -121,67 +136,24 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
           topic,
           payload);
 
-      List<Message> events = payloadFormat.format(payload);
-      if (events == null) {
+      List<Message> messages = payloadFormat.format(payload);
+      if (messages == null) {
         return;
       }
 
-      for (Message event : events) {
-        if (event == null) {
+      for (Message message : messages) {
+        if (message == null) {
           continue;
         }
-
-        TSStatus tsStatus = null;
-        try {
-          InsertRowStatement statement = new InsertRowStatement();
-          statement.setDevicePath(
-              
DataNodeDevicePathCache.getInstance().getPartialPath(event.getDevice()));
-          
TimestampPrecisionUtils.checkTimestampPrecision(event.getTimestamp());
-          statement.setTime(event.getTimestamp());
-          statement.setMeasurements(event.getMeasurements().toArray(new 
String[0]));
-          if (event.getDataTypes() == null) {
-            statement.setDataTypes(new 
TSDataType[event.getMeasurements().size()]);
-            statement.setValues(event.getValues().toArray(new Object[0]));
-            statement.setNeedInferType(true);
-          } else {
-            List<TSDataType> dataTypes = event.getDataTypes();
-            List<String> values = event.getValues();
-            Object[] inferredValues = new Object[values.size()];
-            for (int i = 0; i < values.size(); ++i) {
-              inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), 
values.get(i));
-            }
-            statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
-            statement.setValues(inferredValues);
-          }
-          statement.setAligned(false);
-
-          tsStatus = AuthorityChecker.checkAuthority(statement, session);
-          if (tsStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-            LOG.warn(tsStatus.message);
-          } else {
-            long queryId = sessionManager.requestQueryId();
-            ExecutionResult result =
-                Coordinator.getInstance()
-                    .executeForTreeModel(
-                        statement,
-                        queryId,
-                        sessionManager.getSessionInfo(session),
-                        "",
-                        partitionFetcher,
-                        schemaFetcher,
-                        config.getQueryTimeoutThreshold(),
-                        false);
-            tsStatus = result.status;
-          }
-        } catch (Exception e) {
-          LOG.warn(
-              "meet error when inserting device {}, measurements {}, at time 
{}, because ",
-              event.getDevice(),
-              event.getMeasurements(),
-              event.getTimestamp(),
-              e);
+        if (useTableInsert) {
+          TableMessage tableMessage = (TableMessage) message;
+          // '/' previously defined as a database name
+          tableMessage.setDatabase(
+              msg.getTopicName().substring(0, 
msg.getTopicName().indexOf("/")));
+          insertTable(tableMessage, session);
+        } else {
+          insertTree((TreeMessage) message, session);
         }
-        LOG.debug("event process result: {}", tsStatus);
       }
     } finally {
       // release the payload of the message
@@ -189,6 +161,161 @@ public class MPPPublishHandler extends 
AbstractInterceptHandler {
     }
   }
 
+  /** Inserting table using tablet */
+  private void insertTable(TableMessage message, MqttClientSession session) {
+    TSStatus tsStatus = null;
+    try {
+      TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp());
+      InsertTabletStatement insertTabletStatement = 
constructInsertTabletStatement(message);
+      tsStatus = AuthorityChecker.checkAuthority(insertTabletStatement, 
session);
+      if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOG.warn(tsStatus.message);
+      } else {
+        session.setDatabaseName(message.getDatabase().toLowerCase());
+        session.setSqlDialect(IClientSession.SqlDialect.TABLE);
+        long queryId = sessionManager.requestQueryId();
+        SqlParser relationSqlParser = new SqlParser();
+        Metadata metadata = LocalExecutionPlanner.getInstance().metadata;
+        ExecutionResult result =
+            Coordinator.getInstance()
+                .executeForTableModel(
+                    insertTabletStatement,
+                    relationSqlParser,
+                    session,
+                    queryId,
+                    sessionManager.getSessionInfo(session),
+                    "",
+                    metadata,
+                    config.getQueryTimeoutThreshold());
+
+        tsStatus = result.status;
+        LOG.debug("process result: {}", tsStatus);
+        if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) 
{
+          LOG.warn("mqtt line insert error , message = {}", tsStatus.message);
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn(
+          "meet error when inserting database {}, table {}, tags {}, 
attributes {}, fields {}, at time {}, because ",
+          message.getDatabase(),
+          message.getTable(),
+          message.getTagKeys(),
+          message.getAttributeKeys(),
+          message.getFields(),
+          message.getTimestamp(),
+          e);
+    }
+  }
+
+  private InsertTabletStatement constructInsertTabletStatement(TableMessage 
message)
+      throws IllegalPathException {
+    InsertTabletStatement insertStatement = new InsertTabletStatement();
+    insertStatement.setDevicePath(
+        
DataNodeDevicePathCache.getInstance().getPartialPath(message.getTable()));
+    List<String> measurements =
+        Stream.of(message.getFields(), message.getTagKeys(), 
message.getAttributeKeys())
+            .flatMap(List::stream)
+            .collect(Collectors.toList());
+    insertStatement.setMeasurements(measurements.toArray(new String[0]));
+    long[] timestamps = new long[] {message.getTimestamp()};
+    insertStatement.setTimes(timestamps);
+    int columnSize = measurements.size();
+    int rowSize = 1;
+
+    BitMap[] bitMaps = new BitMap[columnSize];
+    Object[] columns =
+        Stream.of(message.getValues(), message.getTagValues(), 
message.getAttributeValues())
+            .flatMap(List::stream)
+            .toArray(Object[]::new);
+    insertStatement.setColumns(columns);
+    insertStatement.setBitMaps(bitMaps);
+    insertStatement.setRowCount(rowSize);
+    insertStatement.setAligned(false);
+    insertStatement.setWriteToTable(true);
+    TSDataType[] dataTypes = new TSDataType[measurements.size()];
+    TsTableColumnCategory[] columnCategories = new 
TsTableColumnCategory[measurements.size()];
+    for (int i = 0; i < message.getFields().size(); i++) {
+      dataTypes[i] = message.getDataTypes().get(i);
+      columnCategories[i] = TsTableColumnCategory.FIELD;
+    }
+    for (int i = message.getFields().size();
+        i < message.getFields().size() + message.getTagKeys().size();
+        i++) {
+      dataTypes[i] = TSDataType.STRING;
+      columnCategories[i] = TsTableColumnCategory.TAG;
+    }
+    for (int i = message.getFields().size() + message.getTagKeys().size();
+        i
+            < message.getFields().size()
+                + message.getTagKeys().size()
+                + message.getAttributeKeys().size();
+        i++) {
+      dataTypes[i] = TSDataType.STRING;
+      columnCategories[i] = TsTableColumnCategory.ATTRIBUTE;
+    }
+    insertStatement.setDataTypes(dataTypes);
+    insertStatement.setColumnCategories(columnCategories);
+
+    return insertStatement;
+  }
+
+  private void insertTree(TreeMessage message, MqttClientSession session) {
+    TSStatus tsStatus = null;
+    try {
+      InsertRowStatement statement = new InsertRowStatement();
+      statement.setDevicePath(
+          
DataNodeDevicePathCache.getInstance().getPartialPath(message.getDevice()));
+      TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp());
+      statement.setTime(message.getTimestamp());
+      statement.setMeasurements(message.getMeasurements().toArray(new 
String[0]));
+      if (message.getDataTypes() == null) {
+        statement.setDataTypes(new 
TSDataType[message.getMeasurements().size()]);
+        statement.setValues(message.getValues().toArray(new Object[0]));
+        statement.setNeedInferType(true);
+      } else {
+        List<TSDataType> dataTypes = message.getDataTypes();
+        List<String> values = message.getValues();
+        Object[] inferredValues = new Object[values.size()];
+        for (int i = 0; i < values.size(); ++i) {
+          inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i), 
values.get(i));
+        }
+        statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
+        statement.setValues(inferredValues);
+      }
+      statement.setAligned(false);
+
+      tsStatus = AuthorityChecker.checkAuthority(statement, session);
+      if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOG.warn(tsStatus.message);
+      } else {
+        long queryId = sessionManager.requestQueryId();
+        ExecutionResult result =
+            Coordinator.getInstance()
+                .executeForTreeModel(
+                    statement,
+                    queryId,
+                    sessionManager.getSessionInfo(session),
+                    "",
+                    partitionFetcher,
+                    schemaFetcher,
+                    config.getQueryTimeoutThreshold(),
+                    false);
+        tsStatus = result.status;
+        LOG.debug("process result: {}", tsStatus);
+        if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) 
{
+          LOG.warn("mqtt json insert error , message = {}", tsStatus.message);
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn(
+          "meet error when inserting device {}, measurements {}, at time {}, 
because ",
+          message.getDevice(),
+          message.getMeasurements(),
+          message.getTimestamp(),
+          e);
+    }
+  }
+
   @Override
   public void onSessionLoopError(Throwable throwable) {
     // TODO: Implement something sensible here ...
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
index 5ce81e55c15..ba31d869760 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
@@ -18,25 +18,10 @@
 
 package org.apache.iotdb.db.protocol.mqtt;
 
-import org.apache.tsfile.enums.TSDataType;
-
-import java.util.List;
-
-/** Message describes the information sometime sent from the devices. */
+/** Generic parsing of messages */
 public class Message {
-  private String device;
-  private Long timestamp;
-  private List<String> measurements;
-  private List<TSDataType> dataTypes;
-  private List<String> values;
 
-  public String getDevice() {
-    return device;
-  }
-
-  public void setDevice(String device) {
-    this.device = device;
-  }
+  protected Long timestamp;
 
   public Long getTimestamp() {
     return timestamp;
@@ -45,43 +30,4 @@ public class Message {
   public void setTimestamp(Long timestamp) {
     this.timestamp = timestamp;
   }
-
-  public List<String> getMeasurements() {
-    return measurements;
-  }
-
-  public void setMeasurements(List<String> measurements) {
-    this.measurements = measurements;
-  }
-
-  public List<TSDataType> getDataTypes() {
-    return dataTypes;
-  }
-
-  public void setDataTypes(List<TSDataType> dataTypes) {
-    this.dataTypes = dataTypes;
-  }
-
-  public List<String> getValues() {
-    return values;
-  }
-
-  public void setValues(List<String> values) {
-    this.values = values;
-  }
-
-  @Override
-  public String toString() {
-    return "Message{"
-        + "device='"
-        + device
-        + '\''
-        + ", timestamp="
-        + timestamp
-        + ", measurements="
-        + measurements
-        + ", values="
-        + values
-        + '}';
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java
new file mode 100644
index 00000000000..b8aec19da58
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.protocol.mqtt;
+
+import org.apache.tsfile.enums.TSDataType;
+
+import java.util.List;
+
+/** Message parsing into a table */
+public class TableMessage extends Message {
+
+  private String database;
+
+  private String table;
+
+  private List<String> tagKeys;
+
+  private List<Object> tagValues;
+
+  private List<String> attributeKeys;
+
+  private List<Object> attributeValues;
+
+  private List<String> fields;
+
+  private List<TSDataType> dataTypes;
+
+  private List<Object> values;
+
+  public String getDatabase() {
+    return database;
+  }
+
+  public void setDatabase(String database) {
+    this.database = database;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+
+  public List<String> getTagKeys() {
+    return tagKeys;
+  }
+
+  public void setTagKeys(List<String> tagKeys) {
+    this.tagKeys = tagKeys;
+  }
+
+  public List<Object> getTagValues() {
+    return tagValues;
+  }
+
+  public void setTagValues(List<Object> tagValues) {
+    this.tagValues = tagValues;
+  }
+
+  public List<String> getAttributeKeys() {
+    return attributeKeys;
+  }
+
+  public void setAttributeKeys(List<String> attributeKeys) {
+    this.attributeKeys = attributeKeys;
+  }
+
+  public List<Object> getAttributeValues() {
+    return attributeValues;
+  }
+
+  public void setAttributeValues(List<Object> attributeValues) {
+    this.attributeValues = attributeValues;
+  }
+
+  public List<String> getFields() {
+    return fields;
+  }
+
+  public void setFields(List<String> fields) {
+    this.fields = fields;
+  }
+
+  public List<TSDataType> getDataTypes() {
+    return dataTypes;
+  }
+
+  public void setDataTypes(List<TSDataType> dataTypes) {
+    this.dataTypes = dataTypes;
+  }
+
+  public List<Object> getValues() {
+    return values;
+  }
+
+  public void setValues(List<Object> values) {
+    this.values = values;
+  }
+
+  @Override
+  public String toString() {
+    return "TableMessage{"
+        + "database='"
+        + database
+        + '\''
+        + ", table='"
+        + table
+        + '\''
+        + ", tagKeys="
+        + tagKeys
+        + ", tagValues="
+        + tagValues
+        + ", attributeKeys="
+        + attributeKeys
+        + ", attributeValues="
+        + attributeValues
+        + ", fields="
+        + fields
+        + ", dataTypes="
+        + dataTypes
+        + ", values="
+        + values
+        + ", timestamp="
+        + timestamp
+        + '}';
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java
similarity index 87%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java
index 5ce81e55c15..9416ea3c838 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java
@@ -15,17 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.iotdb.db.protocol.mqtt;
 
 import org.apache.tsfile.enums.TSDataType;
 
 import java.util.List;
 
-/** Message describes the information sometime sent from the devices. */
-public class Message {
+/** Message parsing into a tree */
+public class TreeMessage extends Message {
   private String device;
-  private Long timestamp;
   private List<String> measurements;
   private List<TSDataType> dataTypes;
   private List<String> values;
@@ -38,14 +36,6 @@ public class Message {
     this.device = device;
   }
 
-  public Long getTimestamp() {
-    return timestamp;
-  }
-
-  public void setTimestamp(Long timestamp) {
-    this.timestamp = timestamp;
-  }
-
   public List<String> getMeasurements() {
     return measurements;
   }
@@ -77,7 +67,7 @@ public class Message {
         + device
         + '\''
         + ", timestamp="
-        + timestamp
+        + super.timestamp
         + ", measurements="
         + measurements
         + ", values="
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
index ed2644292f0..08d8fe71b8f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
@@ -92,6 +92,8 @@ public class MQTTService implements IService {
     properties.setProperty(
         BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE,
         String.valueOf(iotDBConfig.getMqttHandlerPoolSize()));
+    properties.setProperty(
+        BrokerConstants.DATA_PATH_PROPERTY_NAME, 
String.valueOf(iotDBConfig.getMqttDataPath()));
     
properties.setProperty(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME, 
"true");
     properties.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, 
"false");
     
properties.setProperty(BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, 
"true");
diff --git 
a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
 
b/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
index d4b949084d6..488d6d02d50 100644
--- 
a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
+++ 
b/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
@@ -18,3 +18,4 @@
 #
 
 org.apache.iotdb.db.protocol.mqtt.JSONPayloadFormatter
+org.apache.iotdb.db.protocol.mqtt.LinePayloadFormatter
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
index 082225984ca..bc721406d9a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
@@ -40,7 +40,7 @@ public class JSONPayloadFormatterTest {
     ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
 
     JSONPayloadFormatter formatter = new JSONPayloadFormatter();
-    Message message = formatter.format(buf).get(0);
+    TreeMessage message = (TreeMessage) formatter.format(buf).get(0);
 
     assertEquals("root.sg.d1", message.getDevice());
     assertEquals(Long.valueOf(1586076045524L), message.getTimestamp());
@@ -61,7 +61,7 @@ public class JSONPayloadFormatterTest {
     ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
 
     JSONPayloadFormatter formatter = new JSONPayloadFormatter();
-    Message message = formatter.format(buf).get(1);
+    TreeMessage message = (TreeMessage) formatter.format(buf).get(1);
 
     assertEquals("root.sg.d1", message.getDevice());
     assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
@@ -90,7 +90,7 @@ public class JSONPayloadFormatterTest {
     ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
 
     JSONPayloadFormatter formatter = new JSONPayloadFormatter();
-    Message message = formatter.format(buf).get(1);
+    TreeMessage message = (TreeMessage) formatter.format(buf).get(1);
 
     assertEquals("root.sg.d2", message.getDevice());
     assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
@@ -119,7 +119,7 @@ public class JSONPayloadFormatterTest {
     ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
 
     JSONPayloadFormatter formatter = new JSONPayloadFormatter();
-    Message message = formatter.format(buf).get(3);
+    TreeMessage message = (TreeMessage) formatter.format(buf).get(3);
 
     assertEquals("root.sg.d2", message.getDevice());
     assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
new file mode 100644
index 00000000000..5651ca49b97
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.protocol.mqtt;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.tsfile.utils.Binary;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LinePayloadFormatterTest {
+
+  @Test
+  public void formatLine() {
+    String payload =
+        "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 
field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f
 1";
+
+    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+
+    LinePayloadFormatter formatter = new LinePayloadFormatter();
+    TableMessage message = (TableMessage) formatter.format(buf).get(0);
+
+    assertEquals("test1", message.getTable());
+    assertEquals(Long.valueOf(1L), message.getTimestamp());
+    assertEquals("tag1", message.getTagKeys().get(0));
+    assertEquals("attr1", message.getAttributeKeys().get(0));
+    assertEquals(
+        "value1",
+        ((Binary[]) 
message.getValues().get(0))[0].getStringValue(StandardCharsets.UTF_8));
+    assertEquals(1L, ((long[]) message.getValues().get(1))[0], 0);
+    assertEquals(2L, ((long[]) message.getValues().get(2))[0], 0);
+    assertEquals(3L, ((int[]) message.getValues().get(3))[0], 0);
+    assertTrue(((boolean[]) message.getValues().get(4))[0]);
+    assertFalse(((boolean[]) message.getValues().get(5))[0]);
+    assertEquals(4d, ((double[]) message.getValues().get(6))[0], 0);
+    assertEquals(5f, ((float[]) message.getValues().get(7))[0], 0);
+  }
+
+  @Test
+  public void formatBatchLine() {
+    String payload =
+        "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 
field1=\"value1\",field2=1i,field3=1u 1 \n"
+            + "test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 
field4=\"value4\",field5=10i,field6=10i32 2 ";
+
+    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+
+    LinePayloadFormatter formatter = new LinePayloadFormatter();
+    TableMessage message = (TableMessage) formatter.format(buf).get(1);
+
+    assertEquals("test2", message.getTable());
+    assertEquals(Long.valueOf(2L), message.getTimestamp());
+    assertEquals("tag3", message.getTagKeys().get(0));
+    assertEquals("attr3", message.getAttributeKeys().get(0));
+    assertEquals(10, ((int[]) message.getValues().get(2))[0], 0);
+  }
+
+  @Test
+  public void formatLineAnnotation() {
+    String payload =
+        "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 
field1=\"value1\",field2=1i,field3=1u 1 \n"
+            + " # test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 
field4=\"value4\",field5=10i,field6=10i32 2 ";
+
+    ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+
+    LinePayloadFormatter formatter = new LinePayloadFormatter();
+    List<Message> message = formatter.format(buf);
+
+    assertEquals(1, message.size());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
index 096f5d0d90d..6731e2efad5 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
@@ -37,6 +37,6 @@ public class PayloadFormatManagerTest {
 
   @Test
   public void getDefaultPayloadFormat() {
-    assertNotNull(PayloadFormatManager.getPayloadFormat("json"));
+    assertNotNull(PayloadFormatManager.getPayloadFormat("tree-json"));
   }
 }
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index aa670feb444..5ab621677cc 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1948,8 +1948,9 @@ mqtt_handler_pool_size=1
 
 # the mqtt message payload formatter.
 # effectiveMode: restart
+# Options: [tree-json, table-line]
 # Datatype: String
-mqtt_payload_formatter=json
+mqtt_payload_formatter=tree-json
 
 # max length of mqtt message in byte
 # effectiveMode: restart
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index de7aeae6e05..ba568eae896 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -280,6 +280,7 @@ public class IoTDBConstant {
   public static final String MQTT_PORT_NAME = "mqtt_port";
   public static final String MQTT_HANDLER_POOL_SIZE_NAME = 
"mqtt_handler_pool_size";
   public static final String MQTT_PAYLOAD_FORMATTER_NAME = 
"mqtt_payload_formatter";
+  public static final String MQTT_DATA_PATH = "mqtt_data_path";
   public static final String MQTT_MAX_MESSAGE_SIZE = "mqtt_max_message_size";
 
   // thrift

Reply via email to