This is an automated email from the ASF dual-hosted git repository.
critas pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 723bdd256f6 Fix data export logic in IoTDB-1.3 to avoid redundant high
frequency query (#17076)
723bdd256f6 is described below
commit 723bdd256f6a50dbf4688b974811a178f75fe946
Author: LimJiaWenBrenda <[email protected]>
AuthorDate: Thu Feb 5 16:23:34 2026 +0800
Fix data export logic in IoTDB-1.3 to avoid redundant high frequency query
(#17076)
* fix export data logic
* Fix spotless
* Fix review suggestions
---
.../org/apache/iotdb/tool/data/ExportData.java | 268 ++++++++++++---------
1 file changed, 159 insertions(+), 109 deletions(-)
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
index 67ee5f6d46e..e2a08af86d8 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
@@ -39,11 +39,9 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.thrift.TException;
import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.read.common.Field;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.utils.BytesUtils;
@@ -601,7 +599,7 @@ public class ExportData extends AbstractDataTool {
List<String> names = sessionDataSet.getColumnNames();
List<String> types = sessionDataSet.getColumnTypes();
if (EXPORT_SQL_TYPE_NAME.equalsIgnoreCase(exportType)) {
- writeSqlFile(sessionDataSet, path, names, linesPerFile);
+ writeSqlFile(sessionDataSet, path, names);
} else {
if (Boolean.TRUE.equals(needDataTypePrinted)) {
for (int i = 0; i < names.size(); i++) {
@@ -707,128 +705,180 @@ public class ExportData extends AbstractDataTool {
}
}
- public static void writeSqlFile(
- SessionDataSet sessionDataSet, String filePath, List<String> headers,
int linesPerFile)
+ private static void exportToSqlFileWithAlignDevice(
+ SessionDataSet sessionDataSet, String filePath, List<String>
measurementNames)
throws IOException, IoTDBConnectionException,
StatementExecutionException {
+
+ List<String> localMeasurementNames = new ArrayList<>(measurementNames);
+ if (CollectionUtils.isEmpty(localMeasurementNames) ||
localMeasurementNames.size() <= 1) {
+ return;
+ } else {
+ localMeasurementNames.remove("Time");
+ localMeasurementNames.remove("Device");
+ }
+ if (CollectionUtils.isEmpty(localMeasurementNames)) {
+ return;
+ }
+
+ String sqlPrefix =
+ String.format(
+ "INSERT INTO %s(TIMESTAMP,%s) ALIGNED VALUES (%s);\n",
+ "%s", String.join(",", localMeasurementNames), "%d,%s");
+
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ List<String> columnTypeList = iterator.getColumnTypeList();
+ int totalColumns = columnTypeList.size();
+ String deviceName = null;
int fileIndex = 0;
+ int currentLines = 0;
+ String filePathTemplate = filePath + "_%d" + ".sql";
+ FileWriter writer = null;
+ try {
+ while (iterator.next()) {
+ if (writer == null) {
+ writer = new FileWriter(String.format(filePathTemplate, fileIndex));
+ }
+ deviceName = iterator.getString(2);
+ if (deviceName.startsWith(SYSTEM_DATABASE + ".")) {
+ continue;
+ }
+ List<String> values = new ArrayList<>();
+ for (int index = 2; index < totalColumns; index++) {
+ String dataType = columnTypeList.get(index);
+ String value = iterator.getString(index + 1);
+ if (value == null) {
+ values.add("null");
+ continue;
+ }
+ if ("TEXT".equalsIgnoreCase(dataType)
+ || "STRING".equalsIgnoreCase(dataType)
+ || "DATE".equalsIgnoreCase(dataType)) {
+ values.add(String.format("\"%s\"", value));
+ } else if ("BLOB".equalsIgnoreCase(dataType)) {
+ if (value.length() >= 2 && (value.startsWith("0x") ||
value.startsWith("0X"))) {
+ values.add(String.format("X'%s'", value.substring(2)));
+ } else {
+ values.add(String.format("X'%s'", value));
+ }
+ } else {
+ values.add(value);
+ }
+ }
+ long timestamp = iterator.getLong(1);
+ writer.write(String.format(sqlPrefix, deviceName, timestamp,
String.join(",", values)));
+ currentLines += 1;
+
+ if (currentLines >= linesPerFile) {
+ writer.flush();
+ writer.close();
+ fileIndex += 1;
+ writer = null;
+ currentLines = 0;
+ }
+ }
+ } finally {
+ if (writer != null) {
+ writer.flush();
+ writer.close();
+ }
+ }
+ ioTPrinter.print("\n");
+ }
+
+ private static void exportToSqlFileWithoutAlign(
+ SessionDataSet sessionDataSet, String filePath, List<String> headers)
+ throws IoTDBConnectionException, StatementExecutionException,
IOException {
+
+ List<String> measurementNames = new ArrayList<>();
String deviceName = null;
- boolean writeNull = false;
- List<String> seriesList = new ArrayList<>(headers);
if (CollectionUtils.isEmpty(headers) || headers.size() <= 1) {
- writeNull = true;
+ return;
} else {
- if (headers.contains("Device")) {
- seriesList.remove("Time");
- seriesList.remove("Device");
- } else {
- Path path = new Path(seriesList.get(1), true);
- deviceName = path.getDevice();
- seriesList.remove("Time");
- for (int i = 0; i < seriesList.size(); i++) {
- String series = seriesList.get(i);
- path = new Path(series, true);
- seriesList.set(i, path.getMeasurement());
+ List<String> localHeaders = new ArrayList<>(headers);
+ localHeaders.remove("Time");
+ if (CollectionUtils.isEmpty(localHeaders)) {
+ return;
+ }
+ Path path = new Path(localHeaders.get(0), true);
+ deviceName = path.getDevice();
+ for (String header : localHeaders) {
+ path = new Path(header, true);
+ String meas = path.getMeasurement();
+ if (path.getDevice().equals(deviceName)) {
+ measurementNames.add(meas);
}
}
}
- boolean hasNext = true;
- while (hasNext) {
- int i = 0;
- final String finalFilePath = filePath + "_" + fileIndex + ".sql";
- try (FileWriter writer = new FileWriter(finalFilePath)) {
- if (writeNull) {
- break;
+ if (deviceName.startsWith(SYSTEM_DATABASE + ".")) {
+ return;
+ }
+ String sqlPrefix =
+ String.format(
+ "INSERT INTO %s(TIMESTAMP,%s) VALUES (%s);\n",
+ "%s", String.join(",", measurementNames), "%d,%s");
+
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ List<String> columnTypeList = iterator.getColumnTypeList();
+ int totalColumns = measurementNames.size();
+ int fileIndex = 0;
+ int currentLines = 0;
+ String filePathTemplate = filePath + "_%d" + ".sql";
+ FileWriter writer = null;
+ try {
+ while (iterator.next()) {
+ if (writer == null) {
+ writer = new FileWriter(String.format(filePathTemplate, fileIndex));
}
- while (i++ < linesPerFile) {
- if (sessionDataSet.hasNext()) {
- RowRecord rowRecord = sessionDataSet.next();
- List<Field> fields = rowRecord.getFields();
- List<String> headersTemp = new ArrayList<>(seriesList);
- List<String> timeseries = new ArrayList<>();
- if (headers.contains("Device")) {
- deviceName = fields.get(0).toString();
- if (deviceName.startsWith(SYSTEM_DATABASE + ".")) {
- continue;
- }
- for (String header : headersTemp) {
- timeseries.add(deviceName + "." + header);
- }
- } else {
- if (headers.get(1).startsWith(SYSTEM_DATABASE + ".")) {
- continue;
- }
- timeseries.addAll(headers);
- timeseries.remove(0);
- }
- String sqlMiddle = null;
- if (Boolean.TRUE.equals(aligned)) {
- sqlMiddle = " ALIGNED VALUES (" + rowRecord.getTimestamp() + ",";
+ List<String> values = new ArrayList<>();
+ for (int index = 0; index < totalColumns; index++) {
+ String dataType = columnTypeList.get(index + 1);
+ String value = iterator.getString(index + 2);
+ if (value == null) {
+ values.add("null");
+ continue;
+ }
+ if ("TEXT".equalsIgnoreCase(dataType)
+ || "STRING".equalsIgnoreCase(dataType)
+ || "DATE".equalsIgnoreCase(dataType)) {
+ values.add(String.format("\"%s\"", value));
+ } else if ("BLOB".equalsIgnoreCase(dataType)) {
+ if (value.length() >= 2 && (value.startsWith("0x") ||
value.startsWith("0X"))) {
+ values.add(String.format("X'%s'", value.substring(2)));
} else {
- sqlMiddle = " VALUES (" + rowRecord.getTimestamp() + ",";
- }
- List<String> values = new ArrayList<>();
- if (headers.contains("Device")) {
- fields.remove(0);
+ values.add(String.format("X'%s'", value));
}
- for (int index = 0; index < fields.size(); index++) {
- RowRecord next =
- session
- .executeQueryStatement("SHOW TIMESERIES " +
timeseries.get(index), timeout)
- .next();
- if (ObjectUtils.isNotEmpty(next)) {
- List<Field> timeseriesList = next.getFields();
- String value = fields.get(index).toString();
- if (value.equals("null")) {
- headersTemp.remove(seriesList.get(index));
- continue;
- }
- final String dataType = timeseriesList.get(3).getStringValue();
- if (TSDataType.TEXT.name().equalsIgnoreCase(dataType)
- || TSDataType.STRING.name().equalsIgnoreCase(dataType)) {
- values.add("\'" + value + "\'");
- } else if (TSDataType.BLOB.name().equalsIgnoreCase(dataType)) {
- final byte[] v = fields.get(index).getBinaryV().getValues();
- if (v == null) {
- values.add(null);
- } else {
- values.add(
-
BytesUtils.parseBlobByteArrayToString(v).replaceFirst("0x", "X'") + "'");
- }
- } else if (TSDataType.DATE.name().equalsIgnoreCase(dataType)) {
- final LocalDate dateV = fields.get(index).getDateV();
- if (dateV == null) {
- values.add(null);
- } else {
- values.add("'" + dateV.toString() + "'");
- }
- } else {
- values.add(value);
- }
- } else {
- headersTemp.remove(seriesList.get(index));
- continue;
- }
- }
- if (CollectionUtils.isNotEmpty(headersTemp)) {
- writer.write(
- "INSERT INTO "
- + deviceName
- + "(TIMESTAMP,"
- + String.join(",", headersTemp)
- + ")"
- + sqlMiddle
- + String.join(",", values)
- + ");\n");
- }
-
} else {
- hasNext = false;
- break;
+ values.add(value);
}
}
- fileIndex++;
+ long timestamp = iterator.getLong(1);
+ writer.write(String.format(sqlPrefix, deviceName, timestamp,
String.join(",", values)));
+ currentLines += 1;
+
+ if (currentLines >= linesPerFile) {
+ writer.flush();
+ writer.close();
+ fileIndex += 1;
+ writer = null;
+ currentLines = 0;
+ }
+ }
+ } finally {
+ if (writer != null) {
writer.flush();
+ writer.close();
}
}
+ ioTPrinter.print("\n");
+ }
+
+ public static void writeSqlFile(
+ SessionDataSet sessionDataSet, String filePath, List<String> headers)
+ throws IOException, IoTDBConnectionException,
StatementExecutionException {
+ if (headers.contains("Device")) {
+ exportToSqlFileWithAlignDevice(sessionDataSet, filePath, headers);
+ } else {
+ exportToSqlFileWithoutAlign(sessionDataSet, filePath, headers);
+ }
}
}