This is an automated email from the ASF dual-hosted git repository.
critas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6b5e5b00e4c [To Master] MQTT Extensions to the Table Model Using the
Row Protocol (#14848)
6b5e5b00e4c is described below
commit 6b5e5b00e4c1b73fc69fa5cbd6a693f36083fffa
Author: ppppoooo <[email protected]>
AuthorDate: Fri Feb 21 18:25:45 2025 +0800
[To Master] MQTT Extensions to the Table Model Using the Row Protocol
(#14848)
* mqtt
* It
* it
* youhua
* exclude
* it
* Update pom.xml
* dataPath
* Update pom.xml
* it
* rmset
* rename
* rename
* zhushi
---------
Co-authored-by: xz m <[email protected]>
Co-authored-by: CritasWang <[email protected]>
---
.../server/CustomizedJsonPayloadFormatter.java | 2 +-
.../org/apache/iotdb/mqtt/server/MyMessage.java | 31 +--
.../java/org/apache/iotdb/mqtt/MQTTClient.java | 39 +++-
integration-test/pom.xml | 4 +
.../iotdb/it/env/cluster/ClusterConstant.java | 1 +
.../it/env/cluster/config/MppCommonConfig.java | 6 +
.../it/env/cluster/config/MppDataNodeConfig.java | 12 +
.../env/cluster/config/MppSharedCommonConfig.java | 7 +
.../iotdb/it/env/cluster/node/DataNodeWrapper.java | 3 +
.../it/env/remote/config/RemoteCommonConfig.java | 5 +
.../it/env/remote/config/RemoteDataNodeConfig.java | 10 +
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 +
.../apache/iotdb/itbase/env/DataNodeConfig.java | 4 +
.../relational/it/mqtt/IoTDBMQTTServiceIT.java | 132 +++++++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 13 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 +
.../db/protocol/mqtt/JSONPayloadFormatter.java | 6 +-
.../db/protocol/mqtt/LinePayloadFormatter.java | 252 +++++++++++++++++++++
.../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 239 ++++++++++++++-----
.../org/apache/iotdb/db/protocol/mqtt/Message.java | 58 +----
.../iotdb/db/protocol/mqtt/TableMessage.java | 144 ++++++++++++
.../mqtt/{Message.java => TreeMessage.java} | 16 +-
.../org/apache/iotdb/db/service/MQTTService.java | 2 +
....apache.iotdb.db.protocol.mqtt.PayloadFormatter | 1 +
.../db/protocol/mqtt/JSONPayloadFormatterTest.java | 8 +-
.../db/protocol/mqtt/LinePayloadFormatterTest.java | 91 ++++++++
.../db/protocol/mqtt/PayloadFormatManagerTest.java | 2 +-
.../conf/iotdb-system.properties.template | 3 +-
.../apache/iotdb/commons/conf/IoTDBConstant.java | 1 +
29 files changed, 942 insertions(+), 156 deletions(-)
diff --git
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
index 5b708f1bcd5..171074d62a6 100644
---
a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
+++
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
@@ -43,7 +43,7 @@ public class CustomizedJsonPayloadFormatter implements
PayloadFormatter {
// this is just an example, so we just generate some Messages directly
for (int i = 0; i < 2; i++) {
long ts = i;
- Message message = new Message();
+ MyMessage message = new MyMessage();
message.setDevice("d" + i);
message.setTimestamp(ts);
message.setMeasurements(Arrays.asList("s1", "s2"));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/MyMessage.java
similarity index 68%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
copy to
example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/MyMessage.java
index 5ce81e55c15..d0ab7e08b4e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
+++
b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/MyMessage.java
@@ -7,25 +7,26 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
+package org.apache.iotdb.mqtt.server;
-package org.apache.iotdb.db.protocol.mqtt;
+import org.apache.iotdb.db.protocol.mqtt.Message;
import org.apache.tsfile.enums.TSDataType;
import java.util.List;
-/** Message describes the information sometime sent from the devices. */
-public class Message {
+public class MyMessage extends Message {
+
private String device;
- private Long timestamp;
private List<String> measurements;
private List<TSDataType> dataTypes;
private List<String> values;
@@ -38,14 +39,6 @@ public class Message {
this.device = device;
}
- public Long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(Long timestamp) {
- this.timestamp = timestamp;
- }
-
public List<String> getMeasurements() {
return measurements;
}
@@ -77,7 +70,7 @@ public class Message {
+ device
+ '\''
+ ", timestamp="
- + timestamp
+ + super.timestamp
+ ", measurements="
+ measurements
+ ", values="
diff --git a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
index e0efc473937..fad59224e6f 100644
--- a/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
+++ b/example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
@@ -25,6 +25,9 @@ import org.fusesource.mqtt.client.QoS;
import java.util.Random;
public class MQTTClient {
+
+ private static final String DATABASE = "myMqttTest";
+
public static void main(String[] args) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost("127.0.0.1", 1883);
@@ -35,7 +38,14 @@ public class MQTTClient {
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
+ // the config mqttPayloadFormatter must be tree-json
+ // jsonPayloadFormatter(connection);
+ // the config mqttPayloadFormatter must be table-line
+ linePayloadFormatter(connection);
+ connection.disconnect();
+ }
+ private static void jsonPayloadFormatter(BlockingConnection connection)
throws Exception {
Random random = new Random();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 10; i++) {
@@ -58,7 +68,34 @@ public class MQTTClient {
sb.insert(0, "[");
sb.replace(sb.lastIndexOf(","), sb.length(), "]");
connection.publish("root.sg.d1.s1", sb.toString().getBytes(),
QoS.AT_LEAST_ONCE, false);
+ }
- connection.disconnect();
+ // The database must be created in advance
+ private static void linePayloadFormatter(BlockingConnection connection)
throws Exception {
+
+ String payload =
+ "test1,tag1=t1,tag2=t2 attr3=a5,attr4=a4
field1=\"fieldValue1\",field2=1i,field3=1u 1";
+ connection.publish(DATABASE + "/myTopic", payload.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ Thread.sleep(10);
+
+ payload = "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 2";
+ connection.publish(DATABASE + "/myTopic", payload.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ Thread.sleep(10);
+
+ payload =
+ "test1,tag1=t1,tag2=t2 field7=t,field8=T,field9=true 3 \n "
+ + "test1,tag1=t1,tag2=t2 field7=f,field8=F,field9=FALSE 4";
+ connection.publish(DATABASE + "/myTopic", payload.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ Thread.sleep(10);
+
+ payload =
+ "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2
field1=\"fieldValue1\",field2=1i,field3=1u 4 \n "
+ + "test1,tag1=t1,tag2=t2 field4=2,field5=2i32,field6=2f 5";
+ connection.publish(DATABASE + "/myTopic", payload.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ Thread.sleep(10);
+
+ payload = "# It's a remark\n " + "test1,tag1=t1,tag2=t2
field4=2,field5=2i32,field6=2f 6";
+ connection.publish(DATABASE + "/myTopic", payload.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ Thread.sleep(10);
}
}
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 6438c36e3d5..e2165fe611f 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -225,6 +225,10 @@
<artifactId>jcip-annotations</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.fusesource.mqtt-client</groupId>
+ <artifactId>mqtt-client</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
index 2516bf66561..3162f47ebe8 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
@@ -164,6 +164,7 @@ public class ClusterConstant {
public static final String MQTT_HOST = "mqtt_host";
public static final String MQTT_PORT = "mqtt_port";
+ public static final String MQTT_DATA_PATH = "mqtt_data_path";
public static final String UDF_LIB_DIR = "udf_lib_dir";
public static final String TRIGGER_LIB_DIR = "trigger_lib_dir";
public static final String PIPE_LIB_DIR = "pipe_lib_dir";
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index af1771c1be6..8e8fd199eda 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -321,6 +321,12 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
+ setProperty("mqtt_payload_formatter",
String.valueOf(mqttPayloadFormatter));
+ return this;
+ }
+
@Override
public CommonConfig setSchemaEngineMode(String schemaEngineMode) {
setProperty("schema_engine_mode", schemaEngineMode);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 3d6675c2f46..19dada3d06a 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -83,4 +83,16 @@ public class MppDataNodeConfig extends MppBaseConfig
implements DataNodeConfig {
"compaction_schedule_interval_in_ms",
String.valueOf(compactionScheduleInterval));
return this;
}
+
+ @Override
+ public DataNodeConfig setEnableMQTTService(boolean enableMQTTService) {
+ setProperty("enable_mqtt_service", String.valueOf(enableMQTTService));
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
+ setProperty("mqtt_payload_formatter",
String.valueOf(mqttPayloadFormatter));
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index ddfdaa7d698..24c5a3d5b4d 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -323,6 +323,13 @@ public class MppSharedCommonConfig implements CommonConfig
{
return this;
}
+ @Override
+ public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
+ cnConfig.setMqttPayloadFormatter(mqttPayloadFormatter);
+ dnConfig.setMqttPayloadFormatter(mqttPayloadFormatter);
+ return this;
+ }
+
@Override
public CommonConfig setSchemaEngineMode(String schemaEngineMode) {
cnConfig.setSchemaEngineMode(schemaEngineMode);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
index 76c81ffb3bb..1fe2d264269 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
@@ -58,6 +58,7 @@ import static
org.apache.iotdb.it.env.cluster.ClusterConstant.DN_WAL_DIRS;
import static
org.apache.iotdb.it.env.cluster.ClusterConstant.IOTDB_SYSTEM_PROPERTIES_FILE;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.MAIN_CLASS_NAME;
import static
org.apache.iotdb.it.env.cluster.ClusterConstant.MAX_TSBLOCK_SIZE_IN_BYTES;
+import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_DATA_PATH;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_HOST;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.MQTT_PORT;
import static
org.apache.iotdb.it.env.cluster.ClusterConstant.PAGE_SIZE_IN_BYTE;
@@ -115,6 +116,8 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
// Override mqtt properties of super class
immutableCommonProperties.setProperty(MQTT_HOST, super.getIp());
immutableCommonProperties.setProperty(MQTT_PORT,
String.valueOf(this.mqttPort));
+ immutableCommonProperties.setProperty(
+ MQTT_DATA_PATH, getNodePath() + File.separator + "mqttData");
immutableCommonProperties.setProperty(
PIPE_AIR_GAP_RECEIVER_PORT,
String.valueOf(this.pipeAirGapReceiverPort));
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index c97b3df98ca..300c2b109cd 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -226,6 +226,11 @@ public class RemoteCommonConfig implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
+ return this;
+ }
+
@Override
public CommonConfig setSchemaEngineMode(String schemaEngineMode) {
return this;
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index 7ef4d9dd8cb..b109baa8203 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -48,4 +48,14 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
public DataNodeConfig setCompactionScheduleInterval(long
compactionScheduleInterval) {
return this;
}
+
+ @Override
+ public DataNodeConfig setEnableMQTTService(boolean enableMQTTService) {
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index 0b604bf97a9..0f547443d77 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -104,6 +104,8 @@ public interface CommonConfig {
CommonConfig setEnableMQTTService(boolean enableMQTTService);
+ CommonConfig setMqttPayloadFormatter(String mqttPayloadFormatter);
+
CommonConfig setSchemaEngineMode(String schemaEngineMode);
CommonConfig setSelectIntoInsertTabletPlanRowLimit(int
selectIntoInsertTabletPlanRowLimit);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index ad68ae74a86..b8ad5c2f15b 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -33,4 +33,8 @@ public interface DataNodeConfig {
long loadTsFileAnalyzeSchemaMemorySizeInBytes);
DataNodeConfig setCompactionScheduleInterval(long
compactionScheduleInterval);
+
+ DataNodeConfig setEnableMQTTService(boolean enableMQTTService);
+
+ DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
new file mode 100644
index 00000000000..fcf8af41327
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/mqtt/IoTDBMQTTServiceIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.relational.it.mqtt;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import org.apache.tsfile.read.common.Field;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBMQTTServiceIT {
+ private BlockingConnection connection;
+ private static final String IP = System.getProperty("RemoteIp", "127.0.0.1");
+ private static final String USER = System.getProperty("RemoteUser", "root");
+ private static final String PASSWORD = System.getProperty("RemotePassword",
"root");
+ private static final String DATABASE = "mqtttest";
+ public static final String FORMATTER = "table-line";
+
+ @Before
+ public void setUp() throws Exception {
+ BaseEnv baseEnv = EnvFactory.getEnv();
+ baseEnv.getConfig().getDataNodeConfig().setEnableMQTTService(true);
+ baseEnv.getConfig().getDataNodeConfig().setMqttPayloadFormatter(FORMATTER);
+ baseEnv.initClusterEnvironment();
+ DataNodeWrapper portConflictDataNodeWrapper =
EnvFactory.getEnv().getDataNodeWrapper(0);
+ int port = portConflictDataNodeWrapper.getMqttPort();
+ MQTT mqtt = new MQTT();
+ mqtt.setHost(IP, port);
+ mqtt.setUserName(USER);
+ mqtt.setPassword(PASSWORD);
+ mqtt.setConnectAttemptsMax(3);
+ mqtt.setReconnectDelay(10);
+
+ connection = mqtt.blockingConnection();
+ connection.connect();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ if (connection != null) {
+ connection.disconnect();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testNoAttr() throws Exception {
+ try (final ITableSession session =
+ EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE)) {
+ session.executeNonQueryStatement("CREATE DATABASE " + DATABASE);
+ String payload1 = "test1,tag1=t1,tag2=t2 field1=1,field2=1f,field3=1i32
1";
+ connection.publish(DATABASE + "/myTopic", payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ Thread.sleep(1000);
+ try (final SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select tag1,tag2,field1,field2,field3 from test1 where time =
1")) {
+ assertEquals(5, dataSet.getColumnNames().size());
+ List<Field> fields = dataSet.next().getFields();
+ assertEquals("t1", fields.get(0).getStringValue());
+ assertEquals("t2", fields.get(1).getStringValue());
+ assertEquals(1d, fields.get(2).getDoubleV(), 0);
+ assertEquals(1f, fields.get(3).getFloatV(), 0);
+ assertEquals(1, fields.get(4).getIntV(), 0);
+ }
+ }
+ }
+
+ @Test
+ public void testWithAttr() throws Exception {
+ try (final ITableSession session =
+ EnvFactory.getEnv().getTableSessionConnectionWithDB(DATABASE)) {
+ session.executeNonQueryStatement("CREATE DATABASE " + DATABASE);
+ String payload1 = "test2,tag1=t1,tag2=t2 attr3=a3,attr4=a4
field1=1,field2=1f,field3=1i32 1";
+ connection.publish(DATABASE + "/myTopic", payload1.getBytes(),
QoS.AT_LEAST_ONCE, false);
+ Thread.sleep(1000);
+ try (final SessionDataSet dataSet =
+ session.executeQueryStatement(
+ "select tag1,tag2,attr3,attr4,field1,field2,field3 from test2
where time = 1")) {
+ assertEquals(7, dataSet.getColumnNames().size());
+ List<Field> fields = dataSet.next().getFields();
+ assertEquals("t1", fields.get(0).getStringValue());
+ assertEquals("t2", fields.get(1).getStringValue());
+ assertEquals("a3", fields.get(2).getStringValue());
+ assertEquals("a4", fields.get(3).getStringValue());
+ assertEquals(1d, fields.get(4).getDoubleV(), 0);
+ assertEquals(1f, fields.get(5).getFloatV(), 0);
+ assertEquals(1, fields.get(6).getIntV(), 0);
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c8a904909d4..2ff675b0196 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -113,7 +113,10 @@ public class IoTDBConfig {
private int mqttHandlerPoolSize = 1;
/** The mqtt message payload formatter. */
- private String mqttPayloadFormatter = "json";
+ private String mqttPayloadFormatter = "tree-json";
+
+ /** The mqtt save data path */
+ private String mqttDataPath = "data/";
/** Max mqtt message size. Unit: byte */
private int mqttMaxMessageSize = 1048576;
@@ -2726,6 +2729,14 @@ public class IoTDBConfig {
this.mqttPayloadFormatter = mqttPayloadFormatter;
}
+ public String getMqttDataPath() {
+ return mqttDataPath;
+ }
+
+ public void setMqttDataPath(String mqttDataPath) {
+ this.mqttDataPath = mqttDataPath;
+ }
+
public int getMqttMaxMessageSize() {
return mqttMaxMessageSize;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5d3b9fd5131..8184435eadd 100755
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1854,6 +1854,10 @@ public class IoTDBDescriptor {
properties.getProperty(IoTDBConstant.MQTT_PAYLOAD_FORMATTER_NAME).trim());
}
+ if (properties.getProperty(IoTDBConstant.MQTT_DATA_PATH) != null) {
+
conf.setMqttDataPath(properties.getProperty(IoTDBConstant.MQTT_DATA_PATH).trim());
+ }
+
if (properties.getProperty(IoTDBConstant.ENABLE_MQTT) != null) {
conf.setEnableMQTTService(
Boolean.parseBoolean(properties.getProperty(IoTDBConstant.ENABLE_MQTT).trim()));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
index 524b0a4718d..9d0af6af6c5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
@@ -80,7 +80,7 @@ public class JSONPayloadFormatter implements PayloadFormatter
{
}
private List<Message> formatJson(JsonObject jsonObject) {
- Message message = new Message();
+ TreeMessage message = new TreeMessage();
message.setDevice(jsonObject.get(JSON_KEY_DEVICE).getAsString());
message.setTimestamp(jsonObject.get(JSON_KEY_TIMESTAMP).getAsLong());
message.setMeasurements(
@@ -106,7 +106,7 @@ public class JSONPayloadFormatter implements
PayloadFormatter {
List<Message> ret = new ArrayList<>(timestamps.size());
for (int i = 0; i < timestamps.size(); i++) {
- Message message = new Message();
+ TreeMessage message = new TreeMessage();
message.setDevice(device);
message.setTimestamp(timestamps.get(i));
message.setMeasurements(measurements);
@@ -118,6 +118,6 @@ public class JSONPayloadFormatter implements
PayloadFormatter {
@Override
public String getName() {
- return "json";
+ return "tree-json";
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
new file mode 100644
index 00000000000..62481e0fcb0
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.protocol.mqtt;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * The Line payload formatter. myTable,tag1=value1,tag2=value2
attr1=value1,attr2=value2
+ * fieldKey="fieldValue" 1740109006000 \n myTable,tag1=value1,tag2=value2
fieldKey="fieldValue"
+ * 1740109006000
+ */
+public class LinePayloadFormatter implements PayloadFormatter {
+
+ private static final Logger log =
LoggerFactory.getLogger(LinePayloadFormatter.class);
+
+ /*
+ Regular expression matching line protocol ,the attributes field is not
required
+ */
+ private static final String REGEX =
+ "(?<table>\\w+)(,(?<tags>[^ ]+))?(\\s+(?<attributes>[^
]+))?\\s+(?<fields>[^ ]+)\\s+(?<timestamp>\\d+)";
+ private static final String COMMA = ",";
+ private static final String WELL = "#";
+ private static final String LINE_BREAK = "\n";
+ private static final String EQUAL = "=";
+ private static final String TABLE = "table";
+ private static final String TAGS = "tags";
+ private static final String ATTRIBUTES = "attributes";
+ private static final String FIELDS = "fields";
+ private static final String TIMESTAMP = "timestamp";
+ private static final String NULL = "null";
+
+ private final Pattern pattern;
+
+ public LinePayloadFormatter() {
+ pattern = Pattern.compile(REGEX);
+ }
+
+ @Override
+ public List<Message> format(ByteBuf payload) {
+ List<Message> messages = new ArrayList<>();
+ if (payload == null) {
+ return messages;
+ }
+
+ String txt = payload.toString(StandardCharsets.UTF_8);
+ String[] lines = txt.split(LINE_BREAK);
+ for (String line : lines) {
+ if (line.trim().startsWith(WELL)) {
+ continue;
+ }
+ TableMessage message = new TableMessage();
+ try {
+ Matcher matcher = pattern.matcher(line.trim());
+ if (!matcher.matches()) {
+ log.warn("Invalid line protocol format ,line is {}", line);
+ continue;
+ }
+
+ // Parsing Table Names
+ message.setTable(matcher.group(TABLE));
+
+ // Parsing Tags
+ if (!setTags(matcher, message)) {
+ log.warn("The tags is error , line is {}", line);
+ continue;
+ }
+
+ // Parsing Attributes
+ if (!setAttributes(matcher, message)) {
+ log.warn("The attributes is error , line is {}", line);
+ continue;
+ }
+
+ // Parsing Fields
+ if (!setFields(matcher, message)) {
+ log.warn("The fields is error , line is {}", line);
+ continue;
+ }
+
+ // Parsing timestamp
+ if (!setTimestamp(matcher, message)) {
+ log.warn("The timestamp is error , line is {}", line);
+ continue;
+ }
+
+ messages.add(message);
+ } catch (Exception e) {
+ log.warn(
+ "The line pattern parsing fails, and the failed line message is {}
,exception is",
+ line,
+ e);
+ }
+ }
+ return messages;
+ }
+
+ private boolean setTags(Matcher matcher, TableMessage message) {
+ List<String> tagKeys = new ArrayList<>();
+ List<Object> tagValues = new ArrayList<>();
+ String tagsGroup = matcher.group(TAGS);
+ if (tagsGroup != null && !tagsGroup.isEmpty()) {
+ String[] tagPairs = tagsGroup.split(COMMA);
+ for (String tagPair : tagPairs) {
+ if (!tagPair.isEmpty()) {
+ String[] keyValue = tagPair.split(EQUAL);
+ if (keyValue.length == 2 && !NULL.equals(keyValue[1])) {
+ tagKeys.add(keyValue[0]);
+ tagValues.add(new Binary[] {new
Binary(keyValue[1].getBytes(StandardCharsets.UTF_8))});
+ }
+ }
+ }
+ }
+ if (!tagKeys.isEmpty() && !tagValues.isEmpty() && tagKeys.size() ==
tagValues.size()) {
+ message.setTagKeys(tagKeys);
+ message.setTagValues(tagValues);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private boolean setAttributes(Matcher matcher, TableMessage message) {
+ List<String> attributeKeys = new ArrayList<>();
+ List<Object> attributeValues = new ArrayList<>();
+ String attributesGroup = matcher.group(ATTRIBUTES);
+ if (attributesGroup != null && !attributesGroup.isEmpty()) {
+ String[] attributePairs = attributesGroup.split(COMMA);
+ for (String attributePair : attributePairs) {
+ if (!attributePair.isEmpty()) {
+ String[] keyValue = attributePair.split(EQUAL);
+ if (keyValue.length == 2 && !NULL.equals(keyValue[1])) {
+ attributeKeys.add(keyValue[0]);
+ attributeValues.add(
+ new Binary[] {new
Binary(keyValue[1].getBytes(StandardCharsets.UTF_8))});
+ }
+ }
+ }
+ }
+ if (attributeKeys.size() == attributeValues.size()) {
+ message.setAttributeKeys(attributeKeys);
+ message.setAttributeValues(attributeValues);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private boolean setFields(Matcher matcher, TableMessage message) {
+ List<String> fields = new ArrayList<>();
+ List<TSDataType> dataTypes = new ArrayList<>();
+ List<Object> values = new ArrayList<>();
+ String fieldsGroup = matcher.group(FIELDS);
+ if (fieldsGroup != null && !fieldsGroup.isEmpty()) {
+ String[] fieldPairs = fieldsGroup.split(COMMA);
+ for (String fieldPair : fieldPairs) {
+ if (!fieldPair.isEmpty()) {
+ String[] keyValue = fieldPair.split(EQUAL);
+ if (keyValue.length == 2 && !NULL.equals(keyValue[1])) {
+ fields.add(keyValue[0]);
+ Pair<TSDataType, Object> typeAndValue = analyticValue(keyValue[1]);
+ values.add(typeAndValue.getRight());
+ dataTypes.add(typeAndValue.getLeft());
+ }
+ }
+ }
+ }
+ if (!fields.isEmpty() && !values.isEmpty() && fields.size() ==
values.size()) {
+ message.setFields(fields);
+ message.setDataTypes(dataTypes);
+ message.setValues(values);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private Pair<TSDataType, Object> analyticValue(String value) {
+ if (value.startsWith("\"") && value.endsWith("\"")) {
+ // String
+ return new Pair<>(
+ TSDataType.TEXT,
+ new Binary[] {
+ new Binary(value.substring(1, value.length() -
1).getBytes(StandardCharsets.UTF_8))
+ });
+ } else if (value.equalsIgnoreCase("t")
+ || value.equalsIgnoreCase("true")
+ || value.equalsIgnoreCase("f")
+ || value.equalsIgnoreCase("false")) {
+ // boolean
+ return new Pair<>(
+ TSDataType.BOOLEAN,
+ new boolean[] {value.equalsIgnoreCase("t") ||
value.equalsIgnoreCase("true")});
+ } else if (value.endsWith("f")) {
+ // float
+ return new Pair<>(
+ TSDataType.FLOAT, new float[] {Float.parseFloat(value.substring(0,
value.length() - 1))});
+ } else if (value.endsWith("i32")) {
+ // int
+ return new Pair<>(
+ TSDataType.INT32, new int[] {Integer.parseInt(value.substring(0,
value.length() - 3))});
+ } else if (value.endsWith("u") || value.endsWith("i")) {
+ // long
+ return new Pair<>(
+ TSDataType.INT64, new long[] {Long.parseLong(value.substring(0,
value.length() - 1))});
+ } else {
+ // double
+ return new Pair<>(TSDataType.DOUBLE, new double[]
{Double.parseDouble(value)});
+ }
+ }
+
+ private boolean setTimestamp(Matcher matcher, TableMessage message) {
+ String timestampGroup = matcher.group(TIMESTAMP);
+ if (timestampGroup != null && !timestampGroup.isEmpty()) {
+ message.setTimestamp(Long.parseLong(timestampGroup));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "table-line";
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index 772f3672a14..b7311f0c0a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -20,9 +20,12 @@ package org.apache.iotdb.db.protocol.mqtt;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.MqttClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
@@ -32,7 +35,11 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDeviceP
import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -45,12 +52,15 @@ import
io.moquette.interception.messages.InterceptPublishMessage;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.BitMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZoneId;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/** PublishHandler handle the messages from MQTT clients. */
public class MPPPublishHandler extends AbstractInterceptHandler {
@@ -65,11 +75,13 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
private final PayloadFormatter payloadFormat;
private final IPartitionFetcher partitionFetcher;
private final ISchemaFetcher schemaFetcher;
+ private final boolean useTableInsert;
public MPPPublishHandler(IoTDBConfig config) {
this.payloadFormat =
PayloadFormatManager.getPayloadFormat(config.getMqttPayloadFormatter());
partitionFetcher = ClusterPartitionFetcher.getInstance();
schemaFetcher = ClusterSchemaFetcher.getInstance();
+ useTableInsert = (payloadFormat instanceof LinePayloadFormatter);
}
@Override
@@ -87,7 +99,9 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
new String(msg.getPassword()),
ZoneId.systemDefault().toString(),
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3,
- ClientVersion.V_1_0);
+ ClientVersion.V_1_0,
+ useTableInsert ? IClientSession.SqlDialect.TABLE :
IClientSession.SqlDialect.TREE);
+ sessionManager.registerSession(session);
clientIdToSessionMap.put(msg.getClientID(), session);
}
}
@@ -96,6 +110,7 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
public void onDisconnect(InterceptDisconnectMessage msg) {
MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
if (null != session) {
+ sessionManager.removeCurrSession();
sessionManager.closeSession(session,
Coordinator.getInstance()::cleanupQueryExecution);
}
}
@@ -121,67 +136,24 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
topic,
payload);
- List<Message> events = payloadFormat.format(payload);
- if (events == null) {
+ List<Message> messages = payloadFormat.format(payload);
+ if (messages == null) {
return;
}
- for (Message event : events) {
- if (event == null) {
+ for (Message message : messages) {
+ if (message == null) {
continue;
}
-
- TSStatus tsStatus = null;
- try {
- InsertRowStatement statement = new InsertRowStatement();
- statement.setDevicePath(
-
DataNodeDevicePathCache.getInstance().getPartialPath(event.getDevice()));
-
TimestampPrecisionUtils.checkTimestampPrecision(event.getTimestamp());
- statement.setTime(event.getTimestamp());
- statement.setMeasurements(event.getMeasurements().toArray(new
String[0]));
- if (event.getDataTypes() == null) {
- statement.setDataTypes(new
TSDataType[event.getMeasurements().size()]);
- statement.setValues(event.getValues().toArray(new Object[0]));
- statement.setNeedInferType(true);
- } else {
- List<TSDataType> dataTypes = event.getDataTypes();
- List<String> values = event.getValues();
- Object[] inferredValues = new Object[values.size()];
- for (int i = 0; i < values.size(); ++i) {
- inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i),
values.get(i));
- }
- statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
- statement.setValues(inferredValues);
- }
- statement.setAligned(false);
-
- tsStatus = AuthorityChecker.checkAuthority(statement, session);
- if (tsStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOG.warn(tsStatus.message);
- } else {
- long queryId = sessionManager.requestQueryId();
- ExecutionResult result =
- Coordinator.getInstance()
- .executeForTreeModel(
- statement,
- queryId,
- sessionManager.getSessionInfo(session),
- "",
- partitionFetcher,
- schemaFetcher,
- config.getQueryTimeoutThreshold(),
- false);
- tsStatus = result.status;
- }
- } catch (Exception e) {
- LOG.warn(
- "meet error when inserting device {}, measurements {}, at time
{}, because ",
- event.getDevice(),
- event.getMeasurements(),
- event.getTimestamp(),
- e);
+ if (useTableInsert) {
+ TableMessage tableMessage = (TableMessage) message;
+ // '/' previously defined as a database name
+ tableMessage.setDatabase(
+ msg.getTopicName().substring(0,
msg.getTopicName().indexOf("/")));
+ insertTable(tableMessage, session);
+ } else {
+ insertTree((TreeMessage) message, session);
}
- LOG.debug("event process result: {}", tsStatus);
}
} finally {
// release the payload of the message
@@ -189,6 +161,161 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
}
}
+ /** Inserting table using tablet */
+ private void insertTable(TableMessage message, MqttClientSession session) {
+ TSStatus tsStatus = null;
+ try {
+ TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp());
+ InsertTabletStatement insertTabletStatement =
constructInsertTabletStatement(message);
+ tsStatus = AuthorityChecker.checkAuthority(insertTabletStatement,
session);
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOG.warn(tsStatus.message);
+ } else {
+ session.setDatabaseName(message.getDatabase().toLowerCase());
+ session.setSqlDialect(IClientSession.SqlDialect.TABLE);
+ long queryId = sessionManager.requestQueryId();
+ SqlParser relationSqlParser = new SqlParser();
+ Metadata metadata = LocalExecutionPlanner.getInstance().metadata;
+ ExecutionResult result =
+ Coordinator.getInstance()
+ .executeForTableModel(
+ insertTabletStatement,
+ relationSqlParser,
+ session,
+ queryId,
+ sessionManager.getSessionInfo(session),
+ "",
+ metadata,
+ config.getQueryTimeoutThreshold());
+
+ tsStatus = result.status;
+ LOG.debug("process result: {}", tsStatus);
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
{
+ LOG.warn("mqtt line insert error , message = {}", tsStatus.message);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "meet error when inserting database {}, table {}, tags {},
attributes {}, fields {}, at time {}, because ",
+ message.getDatabase(),
+ message.getTable(),
+ message.getTagKeys(),
+ message.getAttributeKeys(),
+ message.getFields(),
+ message.getTimestamp(),
+ e);
+ }
+ }
+
+ private InsertTabletStatement constructInsertTabletStatement(TableMessage
message)
+ throws IllegalPathException {
+ InsertTabletStatement insertStatement = new InsertTabletStatement();
+ insertStatement.setDevicePath(
+
DataNodeDevicePathCache.getInstance().getPartialPath(message.getTable()));
+ List<String> measurements =
+ Stream.of(message.getFields(), message.getTagKeys(),
message.getAttributeKeys())
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+ insertStatement.setMeasurements(measurements.toArray(new String[0]));
+ long[] timestamps = new long[] {message.getTimestamp()};
+ insertStatement.setTimes(timestamps);
+ int columnSize = measurements.size();
+ int rowSize = 1;
+
+ BitMap[] bitMaps = new BitMap[columnSize];
+ Object[] columns =
+ Stream.of(message.getValues(), message.getTagValues(),
message.getAttributeValues())
+ .flatMap(List::stream)
+ .toArray(Object[]::new);
+ insertStatement.setColumns(columns);
+ insertStatement.setBitMaps(bitMaps);
+ insertStatement.setRowCount(rowSize);
+ insertStatement.setAligned(false);
+ insertStatement.setWriteToTable(true);
+ TSDataType[] dataTypes = new TSDataType[measurements.size()];
+ TsTableColumnCategory[] columnCategories = new
TsTableColumnCategory[measurements.size()];
+ for (int i = 0; i < message.getFields().size(); i++) {
+ dataTypes[i] = message.getDataTypes().get(i);
+ columnCategories[i] = TsTableColumnCategory.FIELD;
+ }
+ for (int i = message.getFields().size();
+ i < message.getFields().size() + message.getTagKeys().size();
+ i++) {
+ dataTypes[i] = TSDataType.STRING;
+ columnCategories[i] = TsTableColumnCategory.TAG;
+ }
+ for (int i = message.getFields().size() + message.getTagKeys().size();
+ i
+ < message.getFields().size()
+ + message.getTagKeys().size()
+ + message.getAttributeKeys().size();
+ i++) {
+ dataTypes[i] = TSDataType.STRING;
+ columnCategories[i] = TsTableColumnCategory.ATTRIBUTE;
+ }
+ insertStatement.setDataTypes(dataTypes);
+ insertStatement.setColumnCategories(columnCategories);
+
+ return insertStatement;
+ }
+
+ private void insertTree(TreeMessage message, MqttClientSession session) {
+ TSStatus tsStatus = null;
+ try {
+ InsertRowStatement statement = new InsertRowStatement();
+ statement.setDevicePath(
+
DataNodeDevicePathCache.getInstance().getPartialPath(message.getDevice()));
+ TimestampPrecisionUtils.checkTimestampPrecision(message.getTimestamp());
+ statement.setTime(message.getTimestamp());
+ statement.setMeasurements(message.getMeasurements().toArray(new
String[0]));
+ if (message.getDataTypes() == null) {
+ statement.setDataTypes(new
TSDataType[message.getMeasurements().size()]);
+ statement.setValues(message.getValues().toArray(new Object[0]));
+ statement.setNeedInferType(true);
+ } else {
+ List<TSDataType> dataTypes = message.getDataTypes();
+ List<String> values = message.getValues();
+ Object[] inferredValues = new Object[values.size()];
+ for (int i = 0; i < values.size(); ++i) {
+ inferredValues[i] = CommonUtils.parseValue(dataTypes.get(i),
values.get(i));
+ }
+ statement.setDataTypes(dataTypes.toArray(new TSDataType[0]));
+ statement.setValues(inferredValues);
+ }
+ statement.setAligned(false);
+
+ tsStatus = AuthorityChecker.checkAuthority(statement, session);
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOG.warn(tsStatus.message);
+ } else {
+ long queryId = sessionManager.requestQueryId();
+ ExecutionResult result =
+ Coordinator.getInstance()
+ .executeForTreeModel(
+ statement,
+ queryId,
+ sessionManager.getSessionInfo(session),
+ "",
+ partitionFetcher,
+ schemaFetcher,
+ config.getQueryTimeoutThreshold(),
+ false);
+ tsStatus = result.status;
+ LOG.debug("process result: {}", tsStatus);
+ if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
{
+ LOG.warn("mqtt json insert error , message = {}", tsStatus.message);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "meet error when inserting device {}, measurements {}, at time {},
because ",
+ message.getDevice(),
+ message.getMeasurements(),
+ message.getTimestamp(),
+ e);
+ }
+ }
+
@Override
public void onSessionLoopError(Throwable throwable) {
// TODO: Implement something sensible here ...
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
index 5ce81e55c15..ba31d869760 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
@@ -18,25 +18,10 @@
package org.apache.iotdb.db.protocol.mqtt;
-import org.apache.tsfile.enums.TSDataType;
-
-import java.util.List;
-
-/** Message describes the information sometime sent from the devices. */
+/** Generic parsing of messages */
public class Message {
- private String device;
- private Long timestamp;
- private List<String> measurements;
- private List<TSDataType> dataTypes;
- private List<String> values;
- public String getDevice() {
- return device;
- }
-
- public void setDevice(String device) {
- this.device = device;
- }
+ protected Long timestamp;
public Long getTimestamp() {
return timestamp;
@@ -45,43 +30,4 @@ public class Message {
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
-
- public List<String> getMeasurements() {
- return measurements;
- }
-
- public void setMeasurements(List<String> measurements) {
- this.measurements = measurements;
- }
-
- public List<TSDataType> getDataTypes() {
- return dataTypes;
- }
-
- public void setDataTypes(List<TSDataType> dataTypes) {
- this.dataTypes = dataTypes;
- }
-
- public List<String> getValues() {
- return values;
- }
-
- public void setValues(List<String> values) {
- this.values = values;
- }
-
- @Override
- public String toString() {
- return "Message{"
- + "device='"
- + device
- + '\''
- + ", timestamp="
- + timestamp
- + ", measurements="
- + measurements
- + ", values="
- + values
- + '}';
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java
new file mode 100644
index 00000000000..b8aec19da58
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.protocol.mqtt;
+
+import org.apache.tsfile.enums.TSDataType;
+
+import java.util.List;
+
+/** Message parsing into a table */
+public class TableMessage extends Message {
+
+ private String database;
+
+ private String table;
+
+ private List<String> tagKeys;
+
+ private List<Object> tagValues;
+
+ private List<String> attributeKeys;
+
+ private List<Object> attributeValues;
+
+ private List<String> fields;
+
+ private List<TSDataType> dataTypes;
+
+ private List<Object> values;
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public List<String> getTagKeys() {
+ return tagKeys;
+ }
+
+ public void setTagKeys(List<String> tagKeys) {
+ this.tagKeys = tagKeys;
+ }
+
+ public List<Object> getTagValues() {
+ return tagValues;
+ }
+
+ public void setTagValues(List<Object> tagValues) {
+ this.tagValues = tagValues;
+ }
+
+ public List<String> getAttributeKeys() {
+ return attributeKeys;
+ }
+
+ public void setAttributeKeys(List<String> attributeKeys) {
+ this.attributeKeys = attributeKeys;
+ }
+
+ public List<Object> getAttributeValues() {
+ return attributeValues;
+ }
+
+ public void setAttributeValues(List<Object> attributeValues) {
+ this.attributeValues = attributeValues;
+ }
+
+ public List<String> getFields() {
+ return fields;
+ }
+
+ public void setFields(List<String> fields) {
+ this.fields = fields;
+ }
+
+ public List<TSDataType> getDataTypes() {
+ return dataTypes;
+ }
+
+ public void setDataTypes(List<TSDataType> dataTypes) {
+ this.dataTypes = dataTypes;
+ }
+
+ public List<Object> getValues() {
+ return values;
+ }
+
+ public void setValues(List<Object> values) {
+ this.values = values;
+ }
+
+ @Override
+ public String toString() {
+ return "TableMessage{"
+ + "database='"
+ + database
+ + '\''
+ + ", table='"
+ + table
+ + '\''
+ + ", tagKeys="
+ + tagKeys
+ + ", tagValues="
+ + tagValues
+ + ", attributeKeys="
+ + attributeKeys
+ + ", attributeValues="
+ + attributeValues
+ + ", fields="
+ + fields
+ + ", dataTypes="
+ + dataTypes
+ + ", values="
+ + values
+ + ", timestamp="
+ + timestamp
+ + '}';
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java
similarity index 87%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java
index 5ce81e55c15..9416ea3c838 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java
@@ -15,17 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.iotdb.db.protocol.mqtt;
import org.apache.tsfile.enums.TSDataType;
import java.util.List;
-/** Message describes the information sometime sent from the devices. */
-public class Message {
+/** Message parsing into a tree */
+public class TreeMessage extends Message {
private String device;
- private Long timestamp;
private List<String> measurements;
private List<TSDataType> dataTypes;
private List<String> values;
@@ -38,14 +36,6 @@ public class Message {
this.device = device;
}
- public Long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(Long timestamp) {
- this.timestamp = timestamp;
- }
-
public List<String> getMeasurements() {
return measurements;
}
@@ -77,7 +67,7 @@ public class Message {
+ device
+ '\''
+ ", timestamp="
- + timestamp
+ + super.timestamp
+ ", measurements="
+ measurements
+ ", values="
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
index ed2644292f0..08d8fe71b8f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
@@ -92,6 +92,8 @@ public class MQTTService implements IService {
properties.setProperty(
BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE,
String.valueOf(iotDBConfig.getMqttHandlerPoolSize()));
+ properties.setProperty(
+ BrokerConstants.DATA_PATH_PROPERTY_NAME,
String.valueOf(iotDBConfig.getMqttDataPath()));
properties.setProperty(BrokerConstants.IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME,
"true");
properties.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME,
"false");
properties.setProperty(BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME,
"true");
diff --git
a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
b/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
index d4b949084d6..488d6d02d50 100644
---
a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
+++
b/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
@@ -18,3 +18,4 @@
#
org.apache.iotdb.db.protocol.mqtt.JSONPayloadFormatter
+org.apache.iotdb.db.protocol.mqtt.LinePayloadFormatter
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
index 082225984ca..bc721406d9a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
@@ -40,7 +40,7 @@ public class JSONPayloadFormatterTest {
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
JSONPayloadFormatter formatter = new JSONPayloadFormatter();
- Message message = formatter.format(buf).get(0);
+ TreeMessage message = (TreeMessage) formatter.format(buf).get(0);
assertEquals("root.sg.d1", message.getDevice());
assertEquals(Long.valueOf(1586076045524L), message.getTimestamp());
@@ -61,7 +61,7 @@ public class JSONPayloadFormatterTest {
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
JSONPayloadFormatter formatter = new JSONPayloadFormatter();
- Message message = formatter.format(buf).get(1);
+ TreeMessage message = (TreeMessage) formatter.format(buf).get(1);
assertEquals("root.sg.d1", message.getDevice());
assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
@@ -90,7 +90,7 @@ public class JSONPayloadFormatterTest {
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
JSONPayloadFormatter formatter = new JSONPayloadFormatter();
- Message message = formatter.format(buf).get(1);
+ TreeMessage message = (TreeMessage) formatter.format(buf).get(1);
assertEquals("root.sg.d2", message.getDevice());
assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
@@ -119,7 +119,7 @@ public class JSONPayloadFormatterTest {
ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
JSONPayloadFormatter formatter = new JSONPayloadFormatter();
- Message message = formatter.format(buf).get(3);
+ TreeMessage message = (TreeMessage) formatter.format(buf).get(3);
assertEquals("root.sg.d2", message.getDevice());
assertEquals(Long.valueOf(1586076065526L), message.getTimestamp());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
new file mode 100644
index 00000000000..5651ca49b97
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.protocol.mqtt;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.tsfile.utils.Binary;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LinePayloadFormatterTest {
+
+ @Test
+ public void formatLine() {
+ String payload =
+ "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2
field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f
1";
+
+ ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+
+ LinePayloadFormatter formatter = new LinePayloadFormatter();
+ TableMessage message = (TableMessage) formatter.format(buf).get(0);
+
+ assertEquals("test1", message.getTable());
+ assertEquals(Long.valueOf(1L), message.getTimestamp());
+ assertEquals("tag1", message.getTagKeys().get(0));
+ assertEquals("attr1", message.getAttributeKeys().get(0));
+ assertEquals(
+ "value1",
+ ((Binary[])
message.getValues().get(0))[0].getStringValue(StandardCharsets.UTF_8));
+ assertEquals(1L, ((long[]) message.getValues().get(1))[0], 0);
+ assertEquals(2L, ((long[]) message.getValues().get(2))[0], 0);
+ assertEquals(3L, ((int[]) message.getValues().get(3))[0], 0);
+ assertTrue(((boolean[]) message.getValues().get(4))[0]);
+ assertFalse(((boolean[]) message.getValues().get(5))[0]);
+ assertEquals(4d, ((double[]) message.getValues().get(6))[0], 0);
+ assertEquals(5f, ((float[]) message.getValues().get(7))[0], 0);
+ }
+
+ @Test
+ public void formatBatchLine() {
+ String payload =
+ "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2
field1=\"value1\",field2=1i,field3=1u 1 \n"
+ + "test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4
field4=\"value4\",field5=10i,field6=10i32 2 ";
+
+ ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+
+ LinePayloadFormatter formatter = new LinePayloadFormatter();
+ TableMessage message = (TableMessage) formatter.format(buf).get(1);
+
+ assertEquals("test2", message.getTable());
+ assertEquals(Long.valueOf(2L), message.getTimestamp());
+ assertEquals("tag3", message.getTagKeys().get(0));
+ assertEquals("attr3", message.getAttributeKeys().get(0));
+ assertEquals(10, ((int[]) message.getValues().get(2))[0], 0);
+ }
+
+ @Test
+ public void formatLineAnnotation() {
+ String payload =
+ "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2
field1=\"value1\",field2=1i,field3=1u 1 \n"
+ + " # test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4
field4=\"value4\",field5=10i,field6=10i32 2 ";
+
+ ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8);
+
+ LinePayloadFormatter formatter = new LinePayloadFormatter();
+ List<Message> message = formatter.format(buf);
+
+ assertEquals(1, message.size());
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
index 096f5d0d90d..6731e2efad5 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
@@ -37,6 +37,6 @@ public class PayloadFormatManagerTest {
@Test
public void getDefaultPayloadFormat() {
- assertNotNull(PayloadFormatManager.getPayloadFormat("json"));
+ assertNotNull(PayloadFormatManager.getPayloadFormat("tree-json"));
}
}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index aa670feb444..5ab621677cc 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1948,8 +1948,9 @@ mqtt_handler_pool_size=1
# the mqtt message payload formatter.
# effectiveMode: restart
+# Options: [tree-json, table-line]
# Datatype: String
-mqtt_payload_formatter=json
+mqtt_payload_formatter=tree-json
# max length of mqtt message in byte
# effectiveMode: restart
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index de7aeae6e05..ba568eae896 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -280,6 +280,7 @@ public class IoTDBConstant {
public static final String MQTT_PORT_NAME = "mqtt_port";
public static final String MQTT_HANDLER_POOL_SIZE_NAME =
"mqtt_handler_pool_size";
public static final String MQTT_PAYLOAD_FORMATTER_NAME =
"mqtt_payload_formatter";
+ public static final String MQTT_DATA_PATH = "mqtt_data_path";
public static final String MQTT_MAX_MESSAGE_SIZE = "mqtt_max_message_size";
// thrift