This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4dbda78d74f Added database check for flush on local & Optimized the
UTF-8 param in IT (#17365)
4dbda78d74f is described below
commit 4dbda78d74f5f81d06aa6559bc16719f09cb8a0f
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 27 10:31:33 2026 +0800
Added database check for flush on local & Optimized the UTF-8 param in IT
(#17365)
* fix
* bishop
* fix-it
---
.../it/env/cluster/node/AbstractNodeWrapper.java | 3 ++-
.../org/apache/iotdb/db/it/IoTDBFlushQueryIT.java | 9 +++++++
.../thrift/ConfigNodeRPCServiceProcessor.java | 6 ++---
.../impl/DataNodeInternalRPCServiceImpl.java | 2 +-
.../config/executor/ClusterConfigTaskExecutor.java | 3 +--
.../plan/relational/sql/parser/AstBuilder.java | 6 ++---
.../iotdb/db/storageengine/StorageEngine.java | 31 ++++++++++++++++++----
7 files changed, 45 insertions(+), 15 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index 2ff390a8e83..5a271c1ead2 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -527,7 +527,8 @@ public abstract class AbstractNodeWrapper implements
BaseNodeWrapper {
"-XX:MaxDirectMemorySize=" + jvmConfig.getMaxDirectMemorySize()
+ "m",
"-Djdk.nio.maxCachedBufferSize=262144",
"-D" + IoTDBConstant.INTEGRATION_TEST_KILL_POINTS + "=" +
killPoints.toString(),
- "-Dsun.jnu.encoding=UTF-8 -Dfile.encoding=UTF-8",
+ "-Dsun.jnu.encoding=UTF-8",
+ "-Dfile.encoding=UTF-8",
"-cp",
server_node_lib_path));
addStartCmdParams(startCmd);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java
index 7e3f7dff2cf..169975450c0 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java
@@ -188,6 +188,15 @@ public class IoTDBFlushQueryIT {
sqe.printStackTrace();
assertTrue(sqe.getMessage().contains(expectedMsg));
}
+ try {
+ statement.execute(
+ "FLUSH
root.noexist.nodatagroup1,root.notExistGroup1,root.notExistGroup2 on local");
+ } catch (SQLException sqe) {
+ String expectedMsg =
+ "500: Database
root.noexist.nodatagroup1,root.notExistGroup1,root.notExistGroup2 does not
exist on local";
+ sqe.printStackTrace();
+ assertTrue(sqe.getMessage().contains(expectedMsg));
+ }
} catch (Exception e) {
fail(e.getMessage());
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 5d6aa8da9f5..4d01f3770c2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -981,11 +981,11 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
@Override
public TSStatus flush(final TFlushReq req) throws TException {
if (req.storageGroups != null) {
- final List<String> noExistSg =
+ final List<String> noExistDB =
configManager.getPartitionManager().filterUnExistDatabases(req.storageGroups);
- if (!noExistSg.isEmpty()) {
+ if (!noExistDB.isEmpty()) {
final StringBuilder sb = new StringBuilder();
- noExistSg.forEach(storageGroup -> sb.append(storageGroup).append(","));
+ noExistDB.forEach(database -> sb.append(database).append(","));
return RpcUtils.getStatus(
TSStatusCode.DATABASE_NOT_EXIST,
"Database " + sb.subSequence(0, sb.length() - 1) + " does not
exist");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 3aa7aad7e4f..c2c3a8d16cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -2433,7 +2433,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus flush(TFlushReq req) throws TException {
try {
- storageEngine.operateFlush(req);
+ storageEngine.operateFlush(req, false);
} catch (Exception e) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index d0627351b27..fc28d7a5f32 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1264,8 +1264,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
} else {
try {
- StorageEngine.getInstance().operateFlush(tFlushReq);
- tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ tsStatus = StorageEngine.getInstance().operateFlush(tFlushReq, true);
} catch (final Exception e) {
tsStatus = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
e.getMessage());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
index 59fefd8d169..70dc79b6adb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java
@@ -1574,17 +1574,17 @@ public class AstBuilder extends
RelationalSqlBaseVisitor<Node> {
@Override
public Node visitFlushStatement(final
RelationalSqlParser.FlushStatementContext ctx) {
final FlushStatement flushStatement = new
FlushStatement(StatementType.FLUSH);
- List<String> storageGroups = null;
+ List<String> databases = null;
if (ctx.booleanValue() != null) {
flushStatement.setSeq(Boolean.parseBoolean(ctx.booleanValue().getText()));
}
flushStatement.setOnCluster(
ctx.localOrClusterMode() == null || ctx.localOrClusterMode().LOCAL()
== null);
if (ctx.identifier() != null) {
- storageGroups =
+ databases =
getIdentifiers(ctx.identifier()).stream().map(Identifier::getValue).collect(toList());
}
- flushStatement.setDatabases(storageGroups);
+ flushStatement.setDatabases(databases);
return new Flush(flushStatement, null);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index fc2f9a2cacd..2a1c21de040 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.commons.schema.ttl.TTLCache;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -555,6 +556,11 @@ public class StorageEngine implements IService {
checkResults(tasks, "Failed to sync close processor.");
}
+ public boolean containsDatabase(final String database) {
+ return dataRegionMap.values().stream()
+ .anyMatch(dataRegion -> Objects.equals(database,
dataRegion.getDatabaseName()));
+ }
+
public void syncCloseProcessorsInDatabase(String databaseName, boolean
isSeq) {
List<Future<Void>> tasks = new ArrayList<>();
for (DataRegion dataRegion : dataRegionMap.values()) {
@@ -662,22 +668,37 @@ public class StorageEngine implements IService {
}
}
- public void operateFlush(TFlushReq req) {
+ public TSStatus operateFlush(final TFlushReq req, final boolean onLocal) {
+ final StorageEngine storageEngine = StorageEngine.getInstance();
if (req.getRegionIds() != null && !req.getRegionIds().isEmpty()) {
-
StorageEngine.getInstance().syncCloseProcessorsInRegion(req.getRegionIds());
+ storageEngine.syncCloseProcessorsInRegion(req.getRegionIds());
} else if (req.storageGroups == null || req.storageGroups.isEmpty()) {
StorageEngine.getInstance().syncCloseAllProcessor();
WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
} else {
+ if (onLocal) {
+ final List<String> noExistDB =
+ req.storageGroups.stream()
+ .filter(database -> !storageEngine.containsDatabase(database))
+ .collect(Collectors.toList());
+ if (!noExistDB.isEmpty()) {
+ final StringBuilder sb = new StringBuilder();
+ noExistDB.forEach(database -> sb.append(database).append(","));
+ return RpcUtils.getStatus(
+ TSStatusCode.DATABASE_NOT_EXIST,
+ "Database " + sb.subSequence(0, sb.length() - 1) + " does not
exist on local");
+ }
+ }
for (String databaseName : req.storageGroups) {
if (req.isSeq == null) {
-
StorageEngine.getInstance().syncCloseProcessorsInDatabase(databaseName);
+ storageEngine.syncCloseProcessorsInDatabase(databaseName);
} else {
- StorageEngine.getInstance()
- .syncCloseProcessorsInDatabase(databaseName,
Boolean.parseBoolean(req.isSeq));
+ storageEngine.syncCloseProcessorsInDatabase(
+ databaseName, Boolean.parseBoolean(req.isSeq));
}
}
}
+ return StatusUtils.OK;
}
public void clearCache() {