This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f2ab237bf76 [feature](cache) support file cache admission control
(#59065)
f2ab237bf76 is described below
commit f2ab237bf76b167a4cce8c3e056abfd28eb4a2a4
Author: Wen Zhenghu <[email protected]>
AuthorDate: Fri Mar 20 12:11:26 2026 +0800
[feature](cache) support file cache admission control (#59065)
To fully understand the implementation of the PR, please refer to the
following link(It is a Chinese document):
https://www.notion.so/V3-1-2c31293e1081807ca476dd5c87efb28e
### 1. PR Function Overview
The core function of this PR is the implementation of **File Cache
Admission Control**.
* **Background**: Doris supports using File Cache to accelerate data
access for remote storage (e.g., HDFS, S3). However, in certain
scenarios (such as large-scale ETL jobs or heavy ad-hoc queries),
reading massive amounts of cold data can evict existing hot data from
the File Cache. This leads to "cache pollution" and a significant drop
in cache hit rates.
* **Goal**: Provide a mechanism to decide whether data scanned by a
specific query is allowed to enter the File Cache, based on dimensions
such as user identity and table information. If admission is denied, the
query will read the data directly from remote storage without populating
the cache.
### 2. Implementation Scheme Analysis
The implementation consists of the following key components:
#### 2.1. FE Side: Admission Decision
The primary logic is located in the `createScanRangeLocations` method of
`FileQueryScanNode.java`.
* **New Configuration**: Introduced the
`Config.enable_file_cache_admission_control` switch.
* **New Manager**: Introduced `FileCacheAdmissionManager` (Singleton) to
execute the specific admission judgment logic.
* **Decision Flow**:
1. Before generating Scan Ranges, `FileQueryScanNode` retrieves the
current User Identity (`userIdentity`), Catalog, Database, and Table
information.
2. It calls `FileCacheAdmissionManager.getInstance().isAllowed(...)` to
obtain a boolean result `fileCacheAdmission`.
3. It logs the decision result and the time cost in query plan.
```
| 0:VHIVE_SCAN_NODE(74)
|
| table: test_file_cache_features.tpch1_parquet.lineitem
|
| inputSplitNum=10, totalFileSize=205792918, scanRanges=10
|
| partition=1/1
|
| cardinality=1469949, numNodes=1
|
| pushdown agg=NONE
|
| file cache request ADMITTED: user_identity:root@%, reason:user
table-level whitelist rule, cost:37996 ns |
| limit: 1
```
#### 2.2. FE Side: Decision Propagation
The decision result needs to be propagated from the `FileQueryScanNode`
down to the underlying split assignment logic.
* **SplitAssignment Modification**:
* The constructor of the `SplitAssignment` class (located in
`org.apache.doris.datasource`) is modified to accept a new `boolean
fileCacheAdmission` parameter.
* **SplitToScanRange Modification**:
* The `splitToScanRange` method (or its corresponding Lambda expression)
is updated to receive the `fileCacheAdmission` parameter.
* This method is responsible for setting this value into the Thrift
object.
#### 2.3. Communication Protocol: Thrift Definition Update
To pass the FE's decision to the BE, the Thrift definition (likely
`TFileRangeDesc` or `TFileScanRangeParams` in `PlanNodes.thrift`)
requires a new field.
* **Inferred Change**: A new field `optional bool file_cache_admission`,
is added to the `TFileRangeDesc` struct.
#### 2.4. BE Side: Enforcement
Although the analysis focuses on the FE, the complete loop requires
enforcement on the BE side:
* **FileReader**: The BE's `FileReader` (e.g., `HdfsFileReader` or
`S3FileReader`) checks the `file_cache_admission` flag in the incoming
`TFileRangeDesc` during initialization or reading.
* **Cache Policy**:
* If `file_cache_admission` is **true** (default): It uses the standard
`FileCachePolicy`, where data not found in the cache is written to the
Block File Cache after reading.
* If `file_cache_admission` is **false**: It sets the `FileCachePolicy`
to `NO_CACHE`, skips the cache writing step, reading directly from
remote storage. This protects the existing cache from being polluted.
### 3. Summary
This PR introduces an **Admission Control Manager** during the FE query
planning phase and transparently passes this control signal through the
**Split Assignment** and **Scan Range Generation** phases. This
ultimately guides the BE side's file readers to **selectively** use the
file cache.
Co-authored-by: xuchenhao <[email protected]>
Co-authored-by: xuchenhao <[email protected]>
Co-authored-by: morningman <[email protected]>
---
be/src/format/csv/csv_reader.cpp | 5 +-
be/src/format/json/new_json_reader.cpp | 3 +
be/src/format/orc/vorc_reader.cpp | 3 +
be/src/format/parquet/vparquet_reader.cpp | 3 +
be/src/io/file_factory.cpp | 2 +-
be/src/io/file_factory.h | 1 +
.../main/java/org/apache/doris/common/Config.java | 10 +
.../src/main/java/org/apache/doris/DorisFE.java | 5 +
.../org/apache/doris/common/ConfigWatcher.java | 53 +-
.../datasource/FileCacheAdmissionManager.java | 721 +++++++++++++++++++++
.../apache/doris/datasource/FileQueryScanNode.java | 80 ++-
.../org/apache/doris/datasource/FileScanNode.java | 16 +
.../apache/doris/datasource/SplitAssignment.java | 8 +-
.../apache/doris/datasource/SplitToScanRange.java | 3 +-
.../doris/nereids/parser/LogicalPlanBuilder.java | 51 ++
.../java/org/apache/doris/planner/ScanNode.java | 4 +
.../datasource/FileCacheAdmissionManagerTest.java | 437 +++++++++++++
.../FileCacheAdmissionRuleRefresherTest.java | 247 +++++++
.../doris/datasource/SplitAssignmentTest.java | 27 +-
gensrc/thrift/PlanNodes.thrift | 1 +
tools/export_mysql_rule_to_json.sh | 83 +++
21 files changed, 1742 insertions(+), 21 deletions(-)
diff --git a/be/src/format/csv/csv_reader.cpp b/be/src/format/csv/csv_reader.cpp
index 24012e5bd43..63045f8bfeb 100644
--- a/be/src/format/csv/csv_reader.cpp
+++ b/be/src/format/csv/csv_reader.cpp
@@ -1,4 +1,4 @@
-// Licensed to the Apache Software Foundation (ASF) under one
+// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
@@ -226,6 +226,9 @@ void CsvReader::_init_file_description() {
if (_range.__isset.fs_name) {
_file_description.fs_name = _range.fs_name;
}
+ if (_range.__isset.file_cache_admission) {
+ _file_description.file_cache_admission = _range.file_cache_admission;
+ }
}
Status CsvReader::init_reader(bool is_load) {
diff --git a/be/src/format/json/new_json_reader.cpp
b/be/src/format/json/new_json_reader.cpp
index dfc50306605..cecfcf3f0dc 100644
--- a/be/src/format/json/new_json_reader.cpp
+++ b/be/src/format/json/new_json_reader.cpp
@@ -171,6 +171,9 @@ void NewJsonReader::_init_file_description() {
if (_range.__isset.fs_name) {
_file_description.fs_name = _range.fs_name;
}
+ if (_range.__isset.file_cache_admission) {
+ _file_description.file_cache_admission = _range.file_cache_admission;
+ }
}
Status NewJsonReader::init_reader(
diff --git a/be/src/format/orc/vorc_reader.cpp
b/be/src/format/orc/vorc_reader.cpp
index acc2bf6243f..9304680a9e4 100644
--- a/be/src/format/orc/vorc_reader.cpp
+++ b/be/src/format/orc/vorc_reader.cpp
@@ -1493,6 +1493,9 @@ void OrcReader::_init_file_description() {
if (_scan_range.__isset.fs_name) {
_file_description.fs_name = _scan_range.fs_name;
}
+ if (_scan_range.__isset.file_cache_admission) {
+ _file_description.file_cache_admission =
_scan_range.file_cache_admission;
+ }
}
DataTypePtr OrcReader::convert_to_doris_type(const orc::Type* orc_type) {
diff --git a/be/src/format/parquet/vparquet_reader.cpp
b/be/src/format/parquet/vparquet_reader.cpp
index c9a166cead5..f2e04304771 100644
--- a/be/src/format/parquet/vparquet_reader.cpp
+++ b/be/src/format/parquet/vparquet_reader.cpp
@@ -388,6 +388,9 @@ void ParquetReader::_init_file_description() {
if (_scan_range.__isset.fs_name) {
_file_description.fs_name = _scan_range.fs_name;
}
+ if (_scan_range.__isset.file_cache_admission) {
+ _file_description.file_cache_admission =
_scan_range.file_cache_admission;
+ }
}
Status ParquetReader::init_reader(
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index efe6cb3c4fa..31d7a7801af 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -67,7 +67,7 @@ io::FileReaderOptions
FileFactory::get_reader_options(RuntimeState* state,
};
if (config::enable_file_cache && state != nullptr &&
state->query_options().__isset.enable_file_cache &&
- state->query_options().enable_file_cache) {
+ state->query_options().enable_file_cache && fd.file_cache_admission) {
opts.cache_type = io::FileCachePolicy::FILE_BLOCK_CACHE;
}
if (state != nullptr &&
state->query_options().__isset.file_cache_base_path &&
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 61e322ca0af..4966137508d 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -71,6 +71,7 @@ struct FileDescription {
// because for a hive table, differenet partitions may have different
// locations(or fs), so different files may have different fs.
std::string fs_name;
+ bool file_cache_admission = true;
};
} // namespace io
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index f9be5d87559..a25b65fca27 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3457,6 +3457,16 @@ public class Config extends ConfigBase {
"The maximum length of the first row error message when data
quality error occurs, default is 256 bytes"})
public static int first_error_msg_max_length = 256;
+ @ConfField(mutable = false, description = {
+ "Whether to enable file cache admission control(Blocklist and
Allowlist)"
+ })
+ public static boolean enable_file_cache_admission_control = false;
+
+ @ConfField(mutable = false, description = {
+ "Directory path for storing admission rules JSON files"
+ })
+ public static String file_cache_admission_control_json_dir = "";
+
@ConfField
public static String cloud_snapshot_handler_class =
"org.apache.doris.cloud.snapshot.CloudSnapshotHandler";
@ConfField
diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
index afdf4c69e87..f456d87f29f 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.lock.DeadlockMonitor;
import org.apache.doris.common.util.JdkUtils;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.FileCacheAdmissionManager;
import org.apache.doris.httpv2.HttpServer;
import org.apache.doris.journal.bdbje.BDBDebugger;
import org.apache.doris.journal.bdbje.BDBTool;
@@ -225,6 +226,10 @@ public class DorisFE {
Env.getCurrentEnv().initialize(args);
Env.getCurrentEnv().waitForReady();
+ if (Config.enable_file_cache_admission_control) {
+ FileCacheAdmissionManager.getInstance().loadOnStartup();
+ }
+
// init and start:
// 1. HttpServer for HTTP Server
// 2. FeServer for Thrift Server
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/ConfigWatcher.java
b/fe/fe-core/src/main/java/org/apache/doris/common/ConfigWatcher.java
index 65530b91f7d..27fb11bfe88 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ConfigWatcher.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ConfigWatcher.java
@@ -32,6 +32,7 @@ import java.nio.file.WatchEvent;
import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
+import java.util.function.Consumer;
/*
* used for watch config changed
@@ -41,6 +42,10 @@ public class ConfigWatcher extends Daemon {
public final Path configPath;
+ private Consumer<Path> onCreateConsumer = null;
+ private Consumer<Path> onModifyConsumer = null;
+ private Consumer<Path> onDeleteConsumer = null;
+
public ConfigWatcher(String configPathStr) {
super("config watcher");
Preconditions.checkState(!Strings.isNullOrEmpty(configPathStr));
@@ -95,16 +100,58 @@ public class ConfigWatcher extends Daemon {
}
}
+ public void setOnCreateConsumer(Consumer<Path> consumer) {
+ this.onCreateConsumer = consumer;
+ }
+
+ public void setOnModifyConsumer(Consumer<Path> consumer) {
+ this.onModifyConsumer = consumer;
+ }
+
+ public void setOnDeleteConsumer(Consumer<Path> consumer) {
+ this.onDeleteConsumer = consumer;
+ }
+
private void handleCreate(Path filePath) {
- // TODO(cmy): implement if needed
+ if (onCreateConsumer != null) {
+ try {
+ onCreateConsumer.accept(filePath);
+ } catch (Exception e) {
+ LOG.error("Error in onCreateConsumer for file created in
directory: " + filePath, e);
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("File created in directory but no onCreateConsumer
set: " + filePath);
+ }
+ }
}
private void handleDelete(Path filePath) {
- // TODO(cmy): implement if needed
+ if (onDeleteConsumer != null) {
+ try {
+ onDeleteConsumer.accept(filePath);
+ } catch (Exception e) {
+ LOG.error("Error in onDeleteConsumer for file deleted from
directory: " + filePath, e);
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("File deleted from directory but no onDeleteConsumer
set: " + filePath);
+ }
+ }
}
private void handleModify(Path filePath) {
- // TODO(cmy): implement if needed
+ if (onModifyConsumer != null) {
+ try {
+ onModifyConsumer.accept(filePath);
+ } catch (Exception e) {
+ LOG.error("Error in onModifyConsumer for file modified in
directory: " + filePath, e);
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("File modified in directory but no onModifyConsumer
set: " + filePath);
+ }
+ }
}
// for test
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileCacheAdmissionManager.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileCacheAdmissionManager.java
new file mode 100644
index 00000000000..11cd15e0d30
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileCacheAdmissionManager.java
@@ -0,0 +1,721 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ConfigWatcher;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class FileCacheAdmissionManager {
+ private static final Logger LOG =
LogManager.getLogger(FileCacheAdmissionManager.class);
+
+ public enum RuleType {
+ EXCLUDE(0),
+ INCLUDE(1);
+
+ private final int value;
+
+ RuleType(int value) {
+ this.value = value;
+ }
+
+ public static RuleType fromValue(int value) {
+ if (value == 0) {
+ return EXCLUDE;
+ } else if (value == 1) {
+ return INCLUDE;
+ }
+ throw new IllegalArgumentException("Invalid RuleType Value: " +
value);
+ }
+ }
+
+ public enum RuleLevel {
+ PARTITION, // 0
+ TABLE, // 1
+ DATABASE, // 2
+ CATALOG, // 3
+ GLOBAL, // 4
+ INVALID // 5
+ }
+
+ public static class RulePattern {
+ private final long id;
+ private final String userIdentity;
+ private final String catalog;
+ private final String database;
+ private final String table;
+ private final String partitionPattern;
+ private final RuleType ruleType;
+
+ public RulePattern(long id, String userIdentity, String catalog,
String database,
+ String table, String partitionPattern, RuleType
ruleType) {
+ this.id = id;
+ this.userIdentity = userIdentity;
+ this.catalog = catalog != null ? catalog : "";
+ this.database = database != null ? database : "";
+ this.table = table != null ? table : "";
+ this.partitionPattern = partitionPattern != null ?
partitionPattern : "";
+ this.ruleType = ruleType;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public String getUserIdentity() {
+ return userIdentity;
+ }
+
+ public String getCatalog() {
+ return catalog;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public String getPartitionPattern() {
+ return partitionPattern;
+ }
+
+ public RuleType getRuleType() {
+ return ruleType;
+ }
+ }
+
+ public static class AdmissionRule {
+ private final long id;
+ private final String userIdentity;
+ private final String catalog;
+ private final String database;
+ private final String table;
+ private final String partitionPattern;
+ private final RuleType ruleType;
+ private final boolean enabled;
+ private final long createdTime;
+ private final long updatedTime;
+
+ @JsonCreator
+ public AdmissionRule(
+ @JsonProperty("id") long id,
+ @JsonProperty("user_identity") String userIdentity,
+ @JsonProperty("catalog_name") String catalog,
+ @JsonProperty("database_name") String database,
+ @JsonProperty("table_name") String table,
+ @JsonProperty("partition_pattern") String partitionPattern,
+ @JsonProperty("rule_type") int ruleType,
+ @JsonProperty("enabled") boolean enabled,
+ @JsonProperty("created_time") long createdTime,
+ @JsonProperty("updated_time") long updatedTime) {
+ this.id = id;
+ this.userIdentity = userIdentity != null ? userIdentity : "";
+ this.catalog = catalog != null ? catalog : "";
+ this.database = database != null ? database : "";
+ this.table = table != null ? table : "";
+ this.partitionPattern = partitionPattern != null ?
partitionPattern : "";
+ this.ruleType = RuleType.fromValue(ruleType);
+ this.enabled = enabled;
+ this.createdTime = createdTime;
+ this.updatedTime = updatedTime;
+ }
+
+ public RulePattern toRulePattern() {
+ return new RulePattern(id, userIdentity, catalog, database, table,
partitionPattern, ruleType);
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public String getUserIdentity() {
+ return userIdentity;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getCatalog() {
+ return catalog;
+ }
+
+ public String getPartitionPattern() {
+ return partitionPattern;
+ }
+
+ public RuleType getRuleType() {
+ return ruleType;
+ }
+
+ public boolean getEnabled() {
+ return enabled;
+ }
+
+ public long getCreatedTime() {
+ return createdTime;
+ }
+
+ public long getUpdatedTime() {
+ return updatedTime;
+ }
+ }
+
+ public static class RuleLoader {
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ public static List<AdmissionRule> loadRulesFromFile(String filePath)
throws Exception {
+ File file = new File(filePath);
+ if (!file.exists()) {
+ throw new IllegalArgumentException("File cache admission JSON
file does not exist: " + filePath);
+ }
+
+ return MAPPER.readValue(file, new
TypeReference<List<AdmissionRule>>() {});
+ }
+ }
+
+ public static class RuleCollection {
+ private boolean excludeGlobal = false;
+ private final Set<String> excludeCatalogRules = new HashSet<>();
+ private final Map<String, Set<String>> excludeDatabaseRules = new
HashMap<>();
+ private final Map<String, Set<String>> excludeTableRules = new
HashMap<>();
+
+ private boolean includeGlobal = false;
+ private final Set<String> includeCatalogRules = new HashSet<>();
+ private final Map<String, Set<String>> includeDatabaseRules = new
HashMap<>();
+ private final Map<String, Set<String>> includeTableRules = new
HashMap<>();
+
+ public static final String REASON_COMMON_CATALOG_BLACKLIST = "common
catalog-level blacklist rule";
+ public static final String REASON_COMMON_CATALOG_WHITELIST = "common
catalog-level whitelist rule";
+ public static final String REASON_COMMON_DATABASE_BLACKLIST = "common
database-level blacklist rule";
+ public static final String REASON_COMMON_DATABASE_WHITELIST = "common
database-level whitelist rule";
+ public static final String REASON_COMMON_TABLE_BLACKLIST = "common
table-level blacklist rule";
+ public static final String REASON_COMMON_TABLE_WHITELIST = "common
table-level whitelist rule";
+ public static final String REASON_USER_GLOBAL_BLACKLIST = "user
global-level blacklist rule";
+ public static final String REASON_USER_GLOBAL_WHITELIST = "user
global-level whitelist rule";
+ public static final String REASON_USER_CATALOG_BLACKLIST = "user
catalog-level blacklist rule";
+ public static final String REASON_USER_CATALOG_WHITELIST = "user
catalog-level whitelist rule";
+ public static final String REASON_USER_DATABASE_BLACKLIST = "user
database-level blacklist rule";
+ public static final String REASON_USER_DATABASE_WHITELIST = "user
database-level whitelist rule";
+ public static final String REASON_USER_TABLE_BLACKLIST = "user
table-level blacklist rule";
+ public static final String REASON_USER_TABLE_WHITELIST = "user
table-level whitelist rule";
+ public static final String REASON_DEFAULT = "default rule";
+
+ public boolean isAdmittedAtTableLevel(String userIdentity, String
catalog, String database, String table,
+ AtomicReference<String> reason) {
+
+ String catalogDatabase = catalog + "." + database;
+
+ if (containsKeyValue(excludeTableRules, table, catalogDatabase)) {
+ reason.set(REASON_COMMON_TABLE_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (containsKeyValue(includeTableRules, table, catalogDatabase)) {
+ reason.set(REASON_COMMON_TABLE_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (containsKeyValue(excludeDatabaseRules, database, catalog)) {
+ reason.set(REASON_COMMON_DATABASE_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (containsKeyValue(includeDatabaseRules, database, catalog)) {
+ reason.set(REASON_COMMON_DATABASE_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (excludeCatalogRules.contains(catalog)) {
+ reason.set(REASON_COMMON_CATALOG_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (includeCatalogRules.contains(catalog)) {
+ reason.set(REASON_COMMON_CATALOG_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+
+ reason.set(REASON_DEFAULT);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+
+ public boolean isAdmittedAtTableLevel(RuleCollection userCollection,
String userIdentity,
+ String catalog, String database,
String table,
+ AtomicReference<String> reason) {
+
+ String catalogDatabase = catalog + "." + database;
+
+ if (containsKeyValue(excludeTableRules, table, catalogDatabase)) {
+ reason.set(REASON_COMMON_TABLE_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (containsKeyValue(userCollection.excludeTableRules, table,
catalogDatabase)) {
+ reason.set(REASON_USER_TABLE_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (containsKeyValue(includeTableRules, table, catalogDatabase)) {
+ reason.set(REASON_COMMON_TABLE_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (containsKeyValue(userCollection.includeTableRules, table,
catalogDatabase)) {
+ reason.set(REASON_USER_TABLE_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (containsKeyValue(excludeDatabaseRules, database, catalog)) {
+ reason.set(REASON_COMMON_DATABASE_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (containsKeyValue(userCollection.excludeDatabaseRules,
database, catalog)) {
+ reason.set(REASON_USER_DATABASE_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (containsKeyValue(includeDatabaseRules, database, catalog)) {
+ reason.set(REASON_COMMON_DATABASE_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (containsKeyValue(userCollection.includeDatabaseRules,
database, catalog)) {
+ reason.set(REASON_USER_DATABASE_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (excludeCatalogRules.contains(catalog)) {
+ reason.set(REASON_COMMON_CATALOG_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (userCollection.excludeCatalogRules.contains(catalog)) {
+ reason.set(REASON_USER_CATALOG_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (includeCatalogRules.contains(catalog)) {
+ reason.set(REASON_COMMON_CATALOG_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (userCollection.includeCatalogRules.contains(catalog)) {
+ reason.set(REASON_USER_CATALOG_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+ if (userCollection.excludeGlobal) {
+ reason.set(REASON_USER_GLOBAL_BLACKLIST);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+ if (userCollection.includeGlobal) {
+ reason.set(REASON_USER_GLOBAL_WHITELIST);
+ logAdmission(true, userIdentity, catalog, database, table,
reason.get());
+ return true;
+ }
+
+ reason.set(REASON_DEFAULT);
+ logAdmission(false, userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+
+ private boolean containsKeyValue(Map<String, Set<String>> map, String
key, String value) {
+ Set<String> set = map.get(key);
+ return set != null && set.contains(value);
+ }
+
+ private void logAdmission(boolean admitted, String userIdentity,
String catalog, String database,
+ String table, String reason) {
+ if (LOG.isDebugEnabled()) {
+ String status = admitted ? "admitted" : "denied";
+
+ String logMessage = String.format(
+ "File cache request %s by %s, user_identity: %s, "
+ + "catalog: %s, database: %s, table: %s",
+ status, reason, userIdentity, catalog, database,
table);
+
+ LOG.debug(logMessage);
+ }
+ }
+
+ public RuleLevel getRuleLevel(RulePattern rulePattern) {
+ int pattern = 0;
+ if (!rulePattern.getPartitionPattern().isEmpty()) {
+ pattern |= 1;
+ }
+ if (!rulePattern.getTable().isEmpty()) {
+ pattern |= 1 << 1;
+ }
+ if (!rulePattern.getDatabase().isEmpty()) {
+ pattern |= 1 << 2;
+ }
+ if (!rulePattern.getCatalog().isEmpty()) {
+ pattern |= 1 << 3;
+ }
+
+ RuleLevel[] levelTable = {
+ /* 0000 */ RuleLevel.GLOBAL, // 0
+ /* 0001 */ RuleLevel.INVALID, // 1
+ /* 0010 */ RuleLevel.INVALID, // 2
+ /* 0011 */ RuleLevel.INVALID, // 3
+ /* 0100 */ RuleLevel.INVALID, // 4
+ /* 0101 */ RuleLevel.INVALID, // 5
+ /* 0110 */ RuleLevel.INVALID, // 6
+ /* 0111 */ RuleLevel.INVALID, // 7
+ /* 1000 */ RuleLevel.CATALOG, // 8
+ /* 1001 */ RuleLevel.INVALID, // 9
+ /* 1010 */ RuleLevel.INVALID, // 10
+ /* 1011 */ RuleLevel.INVALID, // 11
+ /* 1100 */ RuleLevel.DATABASE, // 12
+ /* 1101 */ RuleLevel.INVALID, // 13
+ /* 1110 */ RuleLevel.TABLE, // 14
+ /* 1111 */ RuleLevel.PARTITION // 15
+ };
+
+ return levelTable[pattern];
+ }
+
+ public void add(RulePattern rulePattern) {
+ RuleLevel ruleLevel = getRuleLevel(rulePattern);
+ if (ruleLevel == RuleLevel.INVALID) {
+ return;
+ }
+
+ Set<String> catalogRules = (rulePattern.getRuleType() ==
RuleType.EXCLUDE)
+ ? excludeCatalogRules : includeCatalogRules;
+ Map<String, Set<String>> databaseRules =
(rulePattern.getRuleType() == RuleType.EXCLUDE)
+ ? excludeDatabaseRules : includeDatabaseRules;
+ Map<String, Set<String>> tableRules = (rulePattern.getRuleType()
== RuleType.EXCLUDE)
+ ? excludeTableRules : includeTableRules;
+
+ switch (ruleLevel) {
+ case GLOBAL:
+ if (rulePattern.getRuleType() == RuleType.EXCLUDE) {
+ excludeGlobal = true;
+ } else {
+ includeGlobal = true;
+ }
+ break;
+ case CATALOG:
+ catalogRules.add(rulePattern.getCatalog());
+ break;
+ case DATABASE:
+ databaseRules.computeIfAbsent(rulePattern.getDatabase(), k
-> new HashSet<>())
+ .add(rulePattern.getCatalog());
+ break;
+ case TABLE:
+ String catalogDatabase = rulePattern.getCatalog() + "." +
rulePattern.getDatabase();
+ tableRules.computeIfAbsent(rulePattern.getTable(), k ->
new HashSet<>())
+ .add(catalogDatabase);
+ break;
+ case PARTITION:
+ // TODO: Implementing partition-level rules
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ // Rule management supporting concurrent reads and writes.
+ // Thread safety is provided by the ReentrantReadWriteLock in
FileCacheAdmissionManager.
+ public static class RuleManager {
+ // Characters in ASCII order: A-Z, then other symbols, then a-z
+ private static final int PARTITION_COUNT = 58;
+ private final List<Map<String, RuleCollection>> maps;
+ private final RuleCollection commonCollection;
+
+ static List<String> otherReasons = new ArrayList<>(Arrays.asList(
+ "empty user_identity",
+ "invalid user_identity"
+ ));
+
+ public RuleManager() {
+ maps = new ArrayList<>(PARTITION_COUNT);
+ commonCollection = new RuleCollection();
+
+ for (int i = 0; i < PARTITION_COUNT; i++) {
+ maps.add(new HashMap<>());
+ }
+ }
+
+ private int getIndex(char firstChar) {
+ return firstChar - 'A';
+ }
+
+ public void initialize(List<AdmissionRule> rules) {
+ for (AdmissionRule rule : rules) {
+ if (!rule.getEnabled()) {
+ continue;
+ }
+
+ RulePattern rulePattern = rule.toRulePattern();
+
+ if (rulePattern.getUserIdentity().isEmpty()) {
+ commonCollection.add(rulePattern);
+ continue;
+ }
+
+ char firstChar = rulePattern.getUserIdentity().charAt(0);
+ if (!((firstChar >= 'A' && firstChar <= 'Z') || (firstChar >=
'a' && firstChar <= 'z'))) {
+ continue;
+ }
+
+ int index = getIndex(firstChar);
+ maps.get(index).computeIfAbsent(rulePattern.getUserIdentity(),
+ k -> new RuleCollection()).add(rulePattern);
+ }
+ }
+
+ public boolean isAdmittedAtTableLevel(String userIdentity, String
catalog, String database, String table,
+ AtomicReference<String> reason) {
+ if (userIdentity.isEmpty()) {
+ reason.set(otherReasons.get(0));
+ logDefaultAdmission(userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+
+ char firstChar = userIdentity.charAt(0);
+ if (!((firstChar >= 'A' && firstChar <= 'Z') || (firstChar >= 'a'
&& firstChar <= 'z'))) {
+ reason.set(otherReasons.get(1));
+ logDefaultAdmission(userIdentity, catalog, database, table,
reason.get());
+ return false;
+ }
+
+ int index = getIndex(firstChar);
+ RuleCollection collection = maps.get(index).get(userIdentity);
+ if (collection == null) {
+ return commonCollection.isAdmittedAtTableLevel(userIdentity,
catalog, database, table, reason);
+ } else {
+ return commonCollection.isAdmittedAtTableLevel(
+ collection, userIdentity, catalog, database, table,
reason);
+ }
+ }
+
+ private void logDefaultAdmission(String userIdentity, String catalog,
String database, String table,
+ String reason) {
+ if (LOG.isDebugEnabled()) {
+ String decision = "denied";
+
+ String logMessage = String.format(
+ "File cache request %s by default rule, "
+ + "user_identity: %s, catalog: %s, database: %s,
table: %s, reason: %s",
+ decision, userIdentity, catalog, database, table,
reason);
+
+ LOG.debug(logMessage);
+ }
+ }
+ }
+
+ private RuleManager ruleManager;
+
+ private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();
+ private final ReentrantReadWriteLock.WriteLock writeLock =
rwLock.writeLock();
+
+ private static final FileCacheAdmissionManager INSTANCE = new
FileCacheAdmissionManager();
+
+ private ConfigWatcher watcher;
+
+ public FileCacheAdmissionManager() {
+ this.ruleManager = new RuleManager();
+ }
+
+ public static FileCacheAdmissionManager getInstance() {
+ return INSTANCE;
+ }
+
+ public void initialize(List<AdmissionRule> rules) {
+ ruleManager.initialize(rules);
+ }
+
+ public boolean isAdmittedAtTableLevel(String userIdentity, String catalog,
String database, String table,
+ AtomicReference<String> reason) {
+ readLock.lock();
+ try {
+ return ruleManager.isAdmittedAtTableLevel(userIdentity, catalog,
database, table, reason);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public void loadRules(String filePath) {
+ if (filePath == null || filePath.isEmpty()) {
+ LOG.warn("File cache admission JSON file path is not configured,
admission control will be disabled.");
+ return;
+ }
+
+ try {
+ List<AdmissionRule> loadedRules =
RuleLoader.loadRulesFromFile(filePath);
+ LOG.info("{} rules loaded successfully from file: {}",
loadedRules.size(), filePath);
+
+ RuleManager newRuleManager = new RuleManager();
+ newRuleManager.initialize(loadedRules);
+
+ writeLock.lock();
+ try {
+ ruleManager = newRuleManager;
+ } finally {
+ writeLock.unlock();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to load file cache admission rules from file:
{}", filePath, e);
+ }
+ }
+
+ public void loadRules() {
+ if (Config.file_cache_admission_control_json_dir == null
+ || Config.file_cache_admission_control_json_dir.isEmpty())
{
+ LOG.warn("File cache admission JSON directory is not configured,
admission control will be disabled.");
+ return;
+ }
+
+ try {
+ File ruleDir = new
File(Config.file_cache_admission_control_json_dir);
+
+ if (!ruleDir.exists()) {
+ LOG.warn("File cache admission JSON directory does not exist:
{}",
+ Config.file_cache_admission_control_json_dir);
+ return;
+ }
+
+ if (!ruleDir.isDirectory()) {
+ LOG.error("File cache admission JSON directory is not a
directory: {}",
+ Config.file_cache_admission_control_json_dir);
+ return;
+ }
+
+ File[] jsonFiles = ruleDir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.toLowerCase().endsWith(".json");
+ }
+ });
+
+ if (jsonFiles == null) {
+ LOG.error("Failed to list JSON files in directory: {}",
+ Config.file_cache_admission_control_json_dir);
+ return;
+ }
+
+ LOG.info("Found {} JSON files in admission rule directory: {}",
+ jsonFiles.length,
Config.file_cache_admission_control_json_dir);
+
+ List<AdmissionRule> allRules = new ArrayList<>();
+
+ // Duplicate rule handling: only rules with `enabled=true` are
stored.
+ // A rule is considered duplicate if its `userIdentity`,
`catalog`, `database`, `table`,
+ // `partitionPattern`, and 'ruleType' all match another rule,
regardless of their `enabled` flag.
+ // Duplicate enabled rules are automatically deduplicated during
processing.
+ for (File jsonFile : jsonFiles) {
+ List<AdmissionRule> loadedRules =
RuleLoader.loadRulesFromFile(jsonFile.getPath());
+ LOG.info("{} rules loaded successfully from JSON file: {}",
loadedRules.size(),
+ jsonFile.getPath());
+
+ allRules.addAll(loadedRules);
+ }
+
+ RuleManager newRuleManager = new RuleManager();
+ newRuleManager.initialize(allRules);
+
+ writeLock.lock();
+ try {
+ ruleManager = newRuleManager;
+ } finally {
+ writeLock.unlock();
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to load file cache admission rules from
directory: {}",
+ Config.file_cache_admission_control_json_dir, e);
+ }
+ }
+
+ // Reloads all JSON rules and replaces the RuleManager
+ // when any .json file is created, modified, or deleted.
+ public void loadOnStartup() {
+ if (Config.file_cache_admission_control_json_dir == null
+ || Config.file_cache_admission_control_json_dir.isEmpty()) {
+ LOG.warn("File cache admission JSON directory is not configured,
skip loading.");
+ return;
+ }
+
+ File ruleDir = new File(Config.file_cache_admission_control_json_dir);
+ if (!ruleDir.exists() || !ruleDir.isDirectory()) {
+ LOG.warn("File cache admission JSON directory does not exist or is
not a directory: {}, skip loading.",
+ Config.file_cache_admission_control_json_dir);
+ return;
+ }
+
+ LOG.info("Loading file cache admission rules...");
+ loadRules();
+
+ LOG.info("Starting file cache admission rules refreshing task");
+ watcher = new
ConfigWatcher(Config.file_cache_admission_control_json_dir);
+ watcher.setOnCreateConsumer(filePath -> {
+ String fileName = filePath.toString();
+ if (fileName.endsWith(".json")) {
+ loadRules();
+ }
+ });
+ watcher.setOnDeleteConsumer(filePath -> {
+ String fileName = filePath.toString();
+ if (fileName.endsWith(".json")) {
+ loadRules();
+ }
+ });
+ watcher.setOnModifyConsumer(filePath -> {
+ String fileName = filePath.toString();
+ if (fileName.endsWith(".json")) {
+ loadRules();
+ }
+ });
+ watcher.start();
+
+ LOG.info("Started file cache admission rules refreshing task");
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index f1a37fbeff0..121ab52ab3e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -22,13 +22,16 @@ import org.apache.doris.analysis.TableSample;
import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.FunctionGenTable;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
@@ -67,12 +70,16 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.URI;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
/**
* FileQueryScanNode for querying the file access type of catalog, now only
support
@@ -98,6 +105,12 @@ public abstract class FileQueryScanNode extends
FileScanNode {
protected FileSplitter fileSplitter;
+ // The data cache function only works for queries on Hive, Iceberg,
Hudi(via HMS), and Paimon tables.
+ // See: https://doris.incubator.apache.org/docs/dev/lakehouse/data-cache
+ private static final Set<String> CACHEABLE_CATALOGS = new HashSet<>(
+ Arrays.asList("hms", "iceberg", "paimon")
+ );
+
/**
* External file scan node for Query hms table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check
column priv
@@ -311,11 +324,18 @@ public abstract class FileQueryScanNode extends
FileScanNode {
int numBackends = backendPolicy.numBackends();
List<String> pathPartitionKeys = getPathPartitionKeys();
+
+ boolean admissionResult = true;
+ if (ConnectContext.get().getSessionVariable().isEnableFileCache()
+ && Config.enable_file_cache_admission_control) {
+ admissionResult = fileCacheAdmissionCheck();
+ }
+
if (isBatchMode()) {
// File splits are generated lazily, and fetched by backends while
scanning.
// Only provide the unique ID of split source to backend.
- splitAssignment = new SplitAssignment(
- backendPolicy, this, this::splitToScanRange,
locationProperties, pathPartitionKeys);
+ splitAssignment = new SplitAssignment(backendPolicy, this,
this::splitToScanRange,
+ locationProperties, pathPartitionKeys, admissionResult);
splitAssignment.init();
if (executor != null) {
executor.getSummaryProfile().setGetSplitsFinishTime();
@@ -369,7 +389,8 @@ public abstract class FileQueryScanNode extends
FileScanNode {
for (Backend backend : assignment.keySet()) {
Collection<Split> splits = assignment.get(backend);
for (Split split : splits) {
- scanRangeLocations.add(splitToScanRange(backend,
locationProperties, split, pathPartitionKeys));
+ scanRangeLocations.add(splitToScanRange(backend,
locationProperties, split, pathPartitionKeys,
+ admissionResult));
totalFileSize += split.getLength();
}
scanBackendIds.add(backend.getId());
@@ -394,7 +415,8 @@ public abstract class FileQueryScanNode extends
FileScanNode {
Backend backend,
Map<String, String> locationProperties,
Split split,
- List<String> pathPartitionKeys) throws UserException {
+ List<String> pathPartitionKeys,
+ boolean admissionResult) throws UserException {
FileSplit fileSplit = (FileSplit) split;
TScanRangeLocations curLocations = newLocations();
// If fileSplit has partition values, use the values collected from
hive partitions.
@@ -414,6 +436,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
// set file format type, and the type might fall back to native format
in setScanParams
rangeDesc.setFormatType(getFileFormatType());
setScanParams(rangeDesc, fileSplit);
+ rangeDesc.setFileCacheAdmission(admissionResult);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TScanRangeLocation location = new TScanRangeLocation();
@@ -651,6 +674,55 @@ public abstract class FileQueryScanNode extends
FileScanNode {
return this.scanParams;
}
+ protected boolean fileCacheAdmissionCheck() throws UserException {
+ boolean admissionResultAtTableLevel = true;
+ TableIf tableIf = getTargetTable();
+ String table = tableIf.getName();
+
+ if (tableIf instanceof ExternalTable) {
+ ExternalTable externalTableIf = (ExternalTable) tableIf;
+ String database = tableIf.getDatabase().getFullName();
+ String catalog = externalTableIf.getCatalog().getName();
+
+ if
(CACHEABLE_CATALOGS.contains(externalTableIf.getCatalog().getType())) {
+ UserIdentity currentUser =
ConnectContext.get().getCurrentUserIdentity();
+ String userIdentity = currentUser.getQualifiedUser() + "@" +
currentUser.getHost();
+
+ AtomicReference<String> reason = new AtomicReference<>("");
+
+ long startTime = System.nanoTime();
+
+ admissionResultAtTableLevel =
FileCacheAdmissionManager.getInstance().isAdmittedAtTableLevel(
+ userIdentity, catalog, database, table, reason);
+
+ long endTime = System.nanoTime();
+ double durationMs = (double) (endTime - startTime) / 1_000_000;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("File cache admission control cost {} ms",
String.format("%.6f", durationMs));
+ }
+
+ addFileCacheAdmissionLog(userIdentity,
admissionResultAtTableLevel, reason.get(), durationMs);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip file cache admission control for
non-cacheable table: {}.{}.{}",
+ catalog, database, table);
+ }
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ DatabaseIf databaseIf = tableIf.getDatabase();
+ String database = databaseIf == null ? "null" :
databaseIf.getFullName();
+ String catalog = databaseIf == null || databaseIf.getCatalog()
== null
+ ? "null" : databaseIf.getCatalog().getName();
+ LOG.debug("Skip file cache admission control for non-external
table: {}.{}.{}",
+ catalog, database, table);
+ }
+ }
+
+ return admissionResultAtTableLevel;
+ }
+
protected long applyMaxFileSplitNumLimit(long targetSplitSize, long
totalFileSize) {
int maxFileSplitNum = sessionVariable.getMaxFileSplitNum();
if (maxFileSplitNum <= 0 || totalFileSize <= 0) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index f00713810cf..50cc69d0152 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -54,6 +54,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -72,10 +73,13 @@ public abstract class FileScanNode extends ExternalScanNode
{
// For display pushdown agg result
protected long tableLevelRowCount = -1;
+ protected List<String> fileCacheAdmissionLogs;
+
public FileScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
ScanContext scanContext, boolean needCheckColumnPriv) {
super(id, desc, planNodeName, scanContext, needCheckColumnPriv);
this.needCheckColumnPriv = needCheckColumnPriv;
+ this.fileCacheAdmissionLogs = new ArrayList<>();
}
@Override
@@ -226,6 +230,11 @@ public abstract class FileScanNode extends
ExternalScanNode {
.map(node -> node.getId().asInt() +
"").collect(Collectors.toList()));
output.append(prefix).append("TOPN
OPT:").append(topnFilterSources).append("\n");
}
+
+ for (String admissionLog : fileCacheAdmissionLogs) {
+ output.append(prefix).append(admissionLog).append("\n");
+ }
+
return output.toString();
}
@@ -296,4 +305,11 @@ public abstract class FileScanNode extends
ExternalScanNode {
}
}
}
+
+ protected void addFileCacheAdmissionLog(String userIdentity, Boolean
admitted, String reason, double durationMs) {
+ String admissionStatus = admitted ? "ADMITTED" : "DENIED";
+ String admissionLog = String.format("file cache request %s:
user_identity:%s, reason:%s, cost:%.6f ms",
+ admissionStatus, userIdentity, reason, durationMs);
+ fileCacheAdmissionLogs.add(admissionLog);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
index cc17818d6b5..5f79a006a7a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
@@ -53,6 +53,7 @@ public class SplitAssignment {
private final SplitToScanRange splitToScanRange;
private final Map<String, String> locationProperties;
private final List<String> pathPartitionKeys;
+ private final boolean fileCacheAdmission;
private final Object assignLock = new Object();
private Split sampleSplit = null;
private final AtomicBoolean isStopped = new AtomicBoolean(false);
@@ -66,12 +67,14 @@ public class SplitAssignment {
SplitGenerator splitGenerator,
SplitToScanRange splitToScanRange,
Map<String, String> locationProperties,
- List<String> pathPartitionKeys) {
+ List<String> pathPartitionKeys,
+ boolean fileCacheAdmission) {
this.backendPolicy = backendPolicy;
this.splitGenerator = splitGenerator;
this.splitToScanRange = splitToScanRange;
this.locationProperties = locationProperties;
this.pathPartitionKeys = pathPartitionKeys;
+ this.fileCacheAdmission = fileCacheAdmission;
}
public void init() throws UserException {
@@ -107,7 +110,8 @@ public class SplitAssignment {
Collection<Split> splits = batch.get(backend);
List<TScanRangeLocations> locations = new
ArrayList<>(splits.size());
for (Split split : splits) {
- locations.add(splitToScanRange.getScanRange(backend,
locationProperties, split, pathPartitionKeys));
+ locations.add(splitToScanRange.getScanRange(backend,
locationProperties, split, pathPartitionKeys,
+ fileCacheAdmission));
}
while (needMoreSplit()) {
BlockingQueue<Collection<TScanRangeLocations>> queue =
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java
index 0e890252857..bea93e99adc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitToScanRange.java
@@ -30,5 +30,6 @@ public interface SplitToScanRange {
Backend backend,
Map<String, String> locationProperties,
Split split,
- List<String> pathPartitionKeys) throws UserException;
+ List<String> pathPartitionKeys,
+ boolean fileCacheAdmission) throws UserException;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index c4045ea9c4a..4a15f1736fe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -60,6 +60,7 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.datasource.FileCacheAdmissionManager;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.dictionary.LayoutType;
import org.apache.doris.info.TableNameInfo;
@@ -1104,6 +1105,8 @@ import org.antlr.v4.runtime.Token;
import org.antlr.v4.runtime.tree.ParseTree;
import org.antlr.v4.runtime.tree.RuleNode;
import org.antlr.v4.runtime.tree.TerminalNode;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -1119,6 +1122,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -1127,6 +1131,8 @@ import java.util.stream.Collectors;
*/
@SuppressWarnings({"OptionalUsedAsFieldOrParameterType",
"OptionalGetWithoutIsPresent"})
public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
+ private static final Logger LOG =
LogManager.getLogger(LogicalPlanBuilder.class);
+
private static String JOB_NAME = "jobName";
private static String TASK_ID = "taskId";
@@ -9292,6 +9298,47 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
return new WarmUpClusterCommand(warmUpItems, srcCluster, dstCluster,
isForce, isWarmUpWithTable, properties);
}
+ void fileCacheAdmissionCheck(DorisParser.WarmUpSelectContext ctx) {
+ DorisParser.WarmUpSingleTableRefContext tableRef =
ctx.warmUpSingleTableRef();
+ List<String> identifierParts =
visitMultipartIdentifier(tableRef.multipartIdentifier());
+
+ int partCount = identifierParts.size();
+ String table = identifierParts.get(partCount - 1);
+ String database = (partCount >= 2)
+ ? identifierParts.get(partCount - 2) :
ConnectContext.get().getDatabase();
+ String catalog = (partCount == 3)
+ ? identifierParts.get(partCount - 3) :
ConnectContext.get().getCurrentCatalog().getName();
+
+ UserIdentity currentUser =
ConnectContext.get().getCurrentUserIdentity();
+ String userIdentity = currentUser.getQualifiedUser() + "@" +
currentUser.getHost();
+
+ if (!"internal".equals(catalog)) {
+ AtomicReference<String> reason = new AtomicReference<>("");
+
+ long startTime = System.nanoTime();
+
+ boolean admissionResultAtTableLevel =
FileCacheAdmissionManager.getInstance().isAdmittedAtTableLevel(
+ userIdentity, catalog, database, table, reason);
+
+ long endTime = System.nanoTime();
+ double durationMs = (double) (endTime - startTime) / 1_000_000;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("File cache admission control cost {} ms",
String.format("%.6f", durationMs));
+ }
+
+ if (!admissionResultAtTableLevel) {
+ throw new AnalysisException("WARM UP SELECT denied by file
cache admission control, reason: "
+ + reason);
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip file cache admission control for non-external
table: {}.{}",
+ database, table);
+ }
+ }
+ }
+
@Override
public LogicalPlan visitWarmUpSelect(DorisParser.WarmUpSelectContext ctx) {
LogicalPlan relation =
visitWarmUpSingleTableRef(ctx.warmUpSingleTableRef());
@@ -9319,6 +9366,10 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
+ " disable_file_cache=false in cloud mode");
}
+ if (Config.enable_file_cache_admission_control) {
+ fileCacheAdmissionCheck(ctx);
+ }
+
UnboundBlackholeSink<?> sink = new UnboundBlackholeSink<>(project,
new UnboundBlackholeSinkContext(true));
LogicalPlan command = new WarmupSelectCommand(sink);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index d52977bf2a2..499b6086e66 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -725,4 +725,8 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
public long getCatalogId() {
return Env.getCurrentInternalCatalog().getId();
}
+
+ protected boolean fileCacheAdmissionCheck() throws UserException {
+ return true;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileCacheAdmissionManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileCacheAdmissionManagerTest.java
new file mode 100644
index 00000000000..383c526ccaa
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileCacheAdmissionManagerTest.java
@@ -0,0 +1,437 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class FileCacheAdmissionManagerTest {
+
+ private FileCacheAdmissionManager manager;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void setUp() {
+ manager = new FileCacheAdmissionManager();
+ }
+
+ @Test
+ public void testEmptyUserIdentity() {
+ AtomicReference<String> reason = new AtomicReference<>();
+ boolean result = manager.isAdmittedAtTableLevel("", "catalog",
"database", "table", reason);
+ Assert.assertFalse(result);
+ Assert.assertEquals("empty user_identity", reason.get());
+ }
+
+ @Test
+ public void testInvalidUserIdentity() {
+ AtomicReference<String> reason = new AtomicReference<>();
+ boolean result = manager.isAdmittedAtTableLevel("123user", "catalog",
"database", "table", reason);
+ Assert.assertFalse(result);
+ Assert.assertEquals("invalid user_identity", reason.get());
+ }
+
+ @Test
+ public void testCommonRule() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 1L, "", "catalog_1", "", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 2L, "", "catalog_2", "database_1", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 3L, "", "catalog_3", "database_2", "table_1", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+ File jsonFile = tempFolder.newFile("rules-test-common.json");
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user", "catalog_1",
"database", "table", reason1);
+ Assert.assertTrue(result1);
+ Assert.assertEquals("common catalog-level whitelist rule",
reason1.get());
+
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user", "catalog_2",
"database_1", "table", reason2);
+ Assert.assertTrue(result2);
+ Assert.assertEquals("common database-level whitelist rule",
reason2.get());
+
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user", "catalog_3",
"database_2", "table_1", reason3);
+ Assert.assertTrue(result3);
+ Assert.assertEquals("common table-level whitelist rule",
reason3.get());
+ }
+
+ @Test
+ public void testRuleEnabled() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 1L, "", "catalog_1", "", "", "",
+ 1, false, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 2L, "", "catalog_2", "database_1", "", "",
+ 1, false, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 3L, "", "catalog_3", "database_2", "table_1", "",
+ 1, false, createdTime, updatedTime
+ ));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+ File jsonFile = tempFolder.newFile("rules-test-enabled.json");
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user", "catalog_1",
"database", "table", reason1);
+ Assert.assertFalse(result1);
+ Assert.assertEquals("default rule", reason1.get());
+
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user", "catalog_2",
"database_1", "table", reason2);
+ Assert.assertFalse(result2);
+ Assert.assertEquals("default rule", reason2.get());
+
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user", "catalog_3",
"database_2", "table_1", reason3);
+ Assert.assertFalse(result3);
+ Assert.assertEquals("default rule", reason3.get());
+ }
+
+ @Test
+ public void testUserRule() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 4L, "user_1", "catalog_4", "", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 5L, "user_1", "catalog_5", "database_4", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 6L, "user_1", "catalog_6", "database_5", "table_4", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+ File jsonFile = tempFolder.newFile("rules-test-user.json");
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user_1",
"catalog_4", "database", "table", reason1);
+ Assert.assertTrue(result1);
+ Assert.assertEquals("user catalog-level whitelist rule",
reason1.get());
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user_2",
"catalog_4", "database", "table", reason2);
+ Assert.assertFalse(result2);
+ Assert.assertEquals("default rule", reason2.get());
+
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user_1",
"catalog_5", "database_4", "table", reason3);
+ Assert.assertTrue(result3);
+ Assert.assertEquals("user database-level whitelist rule",
reason3.get());
+ AtomicReference<String> reason4 = new AtomicReference<>();
+ boolean result4 = manager.isAdmittedAtTableLevel("user_2",
"catalog_5", "database_4", "table", reason4);
+ Assert.assertFalse(result4);
+ Assert.assertEquals("default rule", reason4.get());
+
+ AtomicReference<String> reason5 = new AtomicReference<>();
+ boolean result5 = manager.isAdmittedAtTableLevel("user_1",
"catalog_6", "database_5", "table_4", reason5);
+ Assert.assertTrue(result5);
+ Assert.assertEquals("user table-level whitelist rule", reason5.get());
+ AtomicReference<String> reason6 = new AtomicReference<>();
+ boolean result6 = manager.isAdmittedAtTableLevel("user_2",
"catalog_6", "database_5", "table_4", reason6);
+ Assert.assertFalse(result6);
+ Assert.assertEquals("default rule", reason6.get());
+ }
+
+ @Test
+ public void testRuleLevelPriority() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 7L, "user_3", "", "", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+ File jsonFile = tempFolder.newFile("rules-test-priority.json");
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user_3", "catalog",
"database", "table", reason1);
+ Assert.assertTrue(result1);
+ Assert.assertEquals("user global-level whitelist rule", reason1.get());
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 8L, "user_3", "catalog", "", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user_3", "catalog",
"database", "table", reason2);
+ Assert.assertTrue(result2);
+ Assert.assertEquals("user catalog-level whitelist rule",
reason2.get());
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 9L, "user_3", "catalog", "database", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user_3", "catalog",
"database", "table", reason3);
+ Assert.assertTrue(result3);
+ Assert.assertEquals("user database-level whitelist rule",
reason3.get());
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 10L, "user_3", "catalog", "database", "table", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason4 = new AtomicReference<>();
+ boolean result4 = manager.isAdmittedAtTableLevel("user_3", "catalog",
"database", "table", reason4);
+ Assert.assertTrue(result4);
+ Assert.assertEquals("user table-level whitelist rule", reason4.get());
+ }
+
+ @Test
+ public void testRuleTypePriority() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 11L, "user_4", "", "", "", "",
+ 0, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 12L, "user_4", "", "", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+ File jsonFile = tempFolder.newFile("rules-test-type-priority.json");
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user_4", "catalog",
"database", "table", reason1);
+ Assert.assertFalse(result1);
+ Assert.assertEquals("user global-level blacklist rule", reason1.get());
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 13L, "user_4", "catalog", "", "", "",
+ 0, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 14L, "user_4", "catalog", "", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user_4", "catalog",
"database", "table", reason2);
+ Assert.assertFalse(result2);
+ Assert.assertEquals("user catalog-level blacklist rule",
reason2.get());
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 15L, "user_4", "catalog", "database", "", "",
+ 0, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 16L, "user_4", "catalog", "database", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user_4", "catalog",
"database", "table", reason3);
+ Assert.assertFalse(result3);
+ Assert.assertEquals("user database-level blacklist rule",
reason3.get());
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 17L, "user_4", "catalog", "database", "table", "",
+ 0, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 18L, "user_4", "catalog", "database", "table", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason4 = new AtomicReference<>();
+ boolean result4 = manager.isAdmittedAtTableLevel("user_4", "catalog",
"database", "table", reason4);
+ Assert.assertFalse(result4);
+ Assert.assertEquals("user table-level blacklist rule", reason4.get());
+ }
+
+ @Test
+ public void testNestedRulePriorities() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 19L, "user_5", "catalog", "", "", "",
+ 0, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 20L, "user_5", "catalog", "database", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 21L, "user_6", "catalog", "", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 22L, "user_6", "catalog", "database", "", "",
+ 0, true, createdTime, updatedTime
+ ));
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 23L, "user_7", "catalog", "database", "", "",
+ 0, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 24L, "user_7", "catalog", "database", "table", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 25L, "user_8", "catalog", "database", "", "",
+ 1, true, createdTime, updatedTime
+ ));
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 26L, "user_8", "catalog", "database", "table", "",
+ 0, true, createdTime, updatedTime
+ ));
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+ File jsonFile = tempFolder.newFile("rules-test-nested.json");
+ objectMapper.writeValue(jsonFile, rules);
+
+ manager.loadRules(jsonFile.getAbsolutePath());
+
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user_5", "catalog",
"database", "table", reason1);
+ Assert.assertTrue(result1);
+ Assert.assertEquals("user database-level whitelist rule",
reason1.get());
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user_5", "catalog",
"otherDatabase", "table", reason2);
+ Assert.assertFalse(result2);
+ Assert.assertEquals("user catalog-level blacklist rule",
reason2.get());
+
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user_6", "catalog",
"database", "table", reason3);
+ Assert.assertFalse(result3);
+ Assert.assertEquals("user database-level blacklist rule",
reason3.get());
+ AtomicReference<String> reason4 = new AtomicReference<>();
+ boolean result4 = manager.isAdmittedAtTableLevel("user_6", "catalog",
"otherDatabase", "table", reason4);
+ Assert.assertTrue(result4);
+ Assert.assertEquals("user catalog-level whitelist rule",
reason4.get());
+
+ AtomicReference<String> reason5 = new AtomicReference<>();
+ boolean result5 = manager.isAdmittedAtTableLevel("user_7", "catalog",
"database", "table", reason5);
+ Assert.assertTrue(result5);
+ Assert.assertEquals("user table-level whitelist rule", reason5.get());
+ AtomicReference<String> reason6 = new AtomicReference<>();
+ boolean result6 = manager.isAdmittedAtTableLevel("user_7", "catalog",
"database", "otherTable", reason6);
+ Assert.assertFalse(result6);
+ Assert.assertEquals("user database-level blacklist rule",
reason6.get());
+
+ AtomicReference<String> reason7 = new AtomicReference<>();
+ boolean result7 = manager.isAdmittedAtTableLevel("user_8", "catalog",
"database", "table", reason7);
+ Assert.assertFalse(result7);
+ Assert.assertEquals("user table-level blacklist rule", reason7.get());
+ AtomicReference<String> reason8 = new AtomicReference<>();
+ boolean result8 = manager.isAdmittedAtTableLevel("user_8", "catalog",
"database", "otherTable", reason8);
+ Assert.assertTrue(result8);
+ Assert.assertEquals("user database-level whitelist rule",
reason8.get());
+ }
+
+ @AfterClass
+ public static void deleteJsonFile() throws Exception {
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileCacheAdmissionRuleRefresherTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileCacheAdmissionRuleRefresherTest.java
new file mode 100644
index 00000000000..a5325a36f8c
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileCacheAdmissionRuleRefresherTest.java
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.Config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import org.awaitility.Awaitility;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class FileCacheAdmissionRuleRefresherTest {
+
+ private static FileCacheAdmissionManager manager;
+
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ Path currentDir = Paths.get("").toAbsolutePath();
+ Path jsonFileDir = currentDir.resolve("jsonFileDir-test");
+
+ if (!Files.exists(jsonFileDir)) {
+ Files.createDirectories(jsonFileDir);
+ System.out.println("Directory created successfully: " +
jsonFileDir.toAbsolutePath());
+ }
+
+ Config.file_cache_admission_control_json_dir = jsonFileDir.toString();
+
+ manager = new FileCacheAdmissionManager();
+ manager.loadOnStartup();
+ }
+
+ @Test
+ public void testJsonFileCreated() throws Exception {
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user_1",
"catalog_1", "database_1", "table_1", reason1);
+ Assert.assertFalse(result1);
+ Assert.assertEquals("default rule", reason1.get());
+
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user_2",
"catalog_2", "database_2", "table_2", reason2);
+ Assert.assertFalse(result2);
+ Assert.assertEquals("default rule", reason2.get());
+
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 1L, "user_1", "catalog_1", "database_1", "table_1", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ File jsonFile1 = new
File(Config.file_cache_admission_control_json_dir, "rules_1.json");
+ objectMapper.writeValue(jsonFile1, rules);
+
+ rules.clear();
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 2L, "user_2", "catalog_2", "database_2", "table_2", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ File jsonFile2 = new
File(Config.file_cache_admission_control_json_dir, "rules_2.json");
+ objectMapper.writeValue(jsonFile2, rules);
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user_1",
"catalog_1", "database_1", "table_1", reason3);
+ Assert.assertTrue(result3);
+ Assert.assertEquals("user table-level whitelist rule",
reason3.get());
+
+ AtomicReference<String> reason4 = new AtomicReference<>();
+ boolean result4 = manager.isAdmittedAtTableLevel("user_2",
"catalog_2", "database_2", "table_2", reason4);
+ Assert.assertTrue(result4);
+ Assert.assertEquals("user table-level whitelist rule",
reason4.get());
+ });
+ }
+
+ @Test
+ public void testJsonFileDeleted() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 3L, "user_3", "catalog_3", "database_3", "table_3", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ File jsonFile3 = new
File(Config.file_cache_admission_control_json_dir, "rules_3.json");
+ objectMapper.writeValue(jsonFile3, rules);
+
+ rules.clear();
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 4L, "user_4", "catalog_4", "database_4", "table_4", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ File jsonFile4 = new
File(Config.file_cache_admission_control_json_dir, "rules_4.json");
+ objectMapper.writeValue(jsonFile4, rules);
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user_3",
"catalog_3", "database_3", "table_3", reason1);
+ Assert.assertTrue(result1);
+ Assert.assertEquals("user table-level whitelist rule",
reason1.get());
+
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user_4",
"catalog_4", "database_4", "table_4", reason2);
+ Assert.assertTrue(result2);
+ Assert.assertEquals("user table-level whitelist rule",
reason2.get());
+ });
+
+ Assert.assertTrue(jsonFile4.delete());
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user_3",
"catalog_3", "database_3", "table_3", reason3);
+ Assert.assertTrue(result3);
+ Assert.assertEquals("user table-level whitelist rule",
reason3.get());
+
+ AtomicReference<String> reason4 = new AtomicReference<>();
+ boolean result4 = manager.isAdmittedAtTableLevel("user_4",
"catalog_4", "database_4", "table_4", reason4);
+ Assert.assertFalse(result4);
+ Assert.assertEquals("default rule", reason4.get());
+ });
+ }
+
+ @Test
+ public void testJsonFileModified() throws Exception {
+ List<FileCacheAdmissionManager.AdmissionRule> rules = new
ArrayList<>();
+ long createdTime = 0;
+ long updatedTime = 0;
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 5L, "user_5", "catalog_5", "database_5", "table_5", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ File jsonFile5 = new
File(Config.file_cache_admission_control_json_dir, "rules_5.json");
+ objectMapper.writeValue(jsonFile5, rules);
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ AtomicReference<String> reason1 = new AtomicReference<>();
+ boolean result1 = manager.isAdmittedAtTableLevel("user_5",
"catalog_5", "database_5", "table_5", reason1);
+ Assert.assertTrue(result1);
+ Assert.assertEquals("user table-level whitelist rule",
reason1.get());
+
+ AtomicReference<String> reason2 = new AtomicReference<>();
+ boolean result2 = manager.isAdmittedAtTableLevel("user_6",
"catalog_6", "database_6", "table_6", reason2);
+ Assert.assertFalse(result2);
+ Assert.assertEquals("default rule", reason2.get());
+ });
+
+ rules.clear();
+ rules.add(new FileCacheAdmissionManager.AdmissionRule(
+ 6L, "user_6", "catalog_6", "database_6", "table_6", "",
+ 1, true, createdTime, updatedTime
+ ));
+
+ objectMapper.writeValue(jsonFile5, rules);
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ AtomicReference<String> reason3 = new AtomicReference<>();
+ boolean result3 = manager.isAdmittedAtTableLevel("user_5",
"catalog_5", "database_5", "table_5", reason3);
+ Assert.assertFalse(result3);
+ Assert.assertEquals("default rule", reason3.get());
+
+ AtomicReference<String> reason4 = new AtomicReference<>();
+ boolean result4 = manager.isAdmittedAtTableLevel("user_6",
"catalog_6", "database_6", "table_6", reason4);
+ Assert.assertTrue(result4);
+ Assert.assertEquals("user table-level whitelist rule",
reason4.get());
+ });
+ }
+
+ private static void deleteDirectoryRecursively(Path directory) throws
IOException {
+ if (!Files.exists(directory)) {
+ return;
+ }
+
+ Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes
attrs) throws IOException {
+ Files.delete(file);
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException
exc) throws IOException {
+ if (exc != null) {
+ throw exc;
+ }
+ Files.delete(dir);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ }
+
+ @AfterClass
+ public static void deleteJsonFile() throws Exception {
+ Path currentDir = Paths.get("").toAbsolutePath();
+ Path jsonFileDir = currentDir.resolve("jsonFileDir-test");
+
+ if
(Files.exists(Paths.get(Config.file_cache_admission_control_json_dir))) {
+ deleteDirectoryRecursively(jsonFileDir);
+ }
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java
index ab5205b47a7..28b2c604fdf 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java
@@ -76,7 +76,8 @@ public class SplitAssignmentTest {
mockSplitGenerator,
mockSplitToScanRange,
locationProperties,
- pathPartitionKeys
+ pathPartitionKeys,
+ true
);
}
@@ -92,7 +93,8 @@ public class SplitAssignmentTest {
mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
result = batch;
- mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys,
+ true);
result = mockScanRangeLocations;
}
};
@@ -127,7 +129,8 @@ public class SplitAssignmentTest {
mockSplitGenerator,
mockSplitToScanRange,
locationProperties,
- pathPartitionKeys
+ pathPartitionKeys,
+ true
);
new MockUp<SplitAssignment>() {
@@ -196,7 +199,8 @@ public class SplitAssignmentTest {
mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
result = batch;
- mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys,
+ true);
result = mockScanRangeLocations;
}
};
@@ -226,7 +230,8 @@ public class SplitAssignmentTest {
result = batch;
minTimes = 0;
- mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys,
+ true);
result = mockScanRangeLocations;
minTimes = 0;
}
@@ -257,7 +262,8 @@ public class SplitAssignmentTest {
mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
result = batch;
- mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys,
+ true);
result = mockScanRangeLocations;
}
};
@@ -287,7 +293,8 @@ public class SplitAssignmentTest {
mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
result = batch;
- mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys,
+ true);
result = mockScanRangeLocations;
}
};
@@ -339,7 +346,8 @@ public class SplitAssignmentTest {
mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
result = batch;
- mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys,
+ true);
result = mockScanRangeLocations;
}
};
@@ -383,7 +391,8 @@ public class SplitAssignmentTest {
mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
result = batch;
- mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys,
+ true);
result = mockScanRangeLocations;
}
};
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 4092ebff872..7328411d9ec 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -522,6 +522,7 @@ struct TFileRangeDesc {
14: optional i64 self_split_weight
// whether the value of columns_from_path is null
15: optional list<bool> columns_from_path_is_null;
+ 16: optional bool file_cache_admission;
}
struct TSplitSource {
diff --git a/tools/export_mysql_rule_to_json.sh
b/tools/export_mysql_rule_to_json.sh
new file mode 100755
index 00000000000..0c44ff09524
--- /dev/null
+++ b/tools/export_mysql_rule_to_json.sh
@@ -0,0 +1,83 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -e
+
+# Configuration
+DB_HOST="localhost"
+DB_USER="root"
+DB_NAME="file_cache_admission_control"
+DB_PASS=""
+TABLE_NAME="admission_policy"
+OUTPUT_FILE="rule_$(date +%Y%m%d_%H%M%S).json"
+
+echo "=== Database Export Configuration ==="
+echo "Database Host: $DB_HOST"
+echo "Database User: $DB_USER"
+echo "Database Name: $DB_NAME"
+echo "Password: $(if [ -n "$DB_PASS" ]; then echo "Set"; else echo "Not set";
fi)"
+echo "Table Name: $TABLE_NAME"
+echo "Output File: $OUTPUT_FILE"
+echo "====================================="
+echo ""
+
+# Query and convert to JSON (including long type timestamps)
+QUERY=$(cat <<SQL
+SELECT
+ JSON_ARRAYAGG(
+ JSON_OBJECT(
+ 'id', id,
+ 'user_identity', user_identity,
+ 'catalog_name', IFNULL(catalog_name, ''),
+ 'database_name', IFNULL(database_name, ''),
+ 'table_name', IFNULL(table_name, ''),
+ 'partition_pattern', IFNULL(partition_pattern, ''),
+ 'rule_type', rule_type,
+ 'enabled', CASE WHEN enabled = 1 THEN true ELSE false END,
+ 'created_time', UNIX_TIMESTAMP(created_time),
+ 'updated_time', UNIX_TIMESTAMP(updated_time)
+ )
+ ) AS json_data
+FROM ${TABLE_NAME}
+SQL
+)
+
+# Execute query
+if [ -n "$DB_PASS" ]; then
+ JSON_DATA=$(echo "$QUERY" | mysql -h $DB_HOST -u $DB_USER -p$DB_PASS
$DB_NAME -N 2>/dev/null)
+else
+ JSON_DATA=$(echo "$QUERY" | mysql -h $DB_HOST -u $DB_USER $DB_NAME -N)
+fi
+
+# Handle NULL
+if [ "$JSON_DATA" = "NULL" ] || [ -z "$JSON_DATA" ]; then
+ JSON_DATA="[]"
+fi
+
+# Save to file
+echo "$JSON_DATA" > "$OUTPUT_FILE"
+
+# Format
+if command -v jq &> /dev/null; then
+ jq '.' "$OUTPUT_FILE" | awk '
+ /^ {/ && NR > 3 {print ""}
+ {print}
+ ' > "${OUTPUT_FILE}.tmp" && mv "${OUTPUT_FILE}.tmp" "$OUTPUT_FILE"
+fi
+
+echo "Export completed: $OUTPUT_FILE"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]