This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 1adb4cfdd9 [Enhancement](tvf) Table value function support reading
local file (#22915)
1adb4cfdd9 is described below
commit 1adb4cfdd954648b9aae96f4c91a271422fb711c
Author: Mingyu Chen <[email protected]>
AuthorDate: Sun Aug 13 19:39:16 2023 +0800
[Enhancement](tvf) Table value function support reading local file (#22915)
cherry pick #17404
---
be/src/io/fs/err_utils.cpp | 20 +++
be/src/io/fs/err_utils.h | 1 +
be/src/io/fs/local_file_system.cpp | 50 +++++++
be/src/io/fs/local_file_system.h | 11 ++
be/src/olap/tablet_schema_cache.cpp | 1 -
be/src/service/internal_service.cpp | 22 +++
be/src/service/internal_service.h | 3 +
be/test/io/fs/local_file_system_test.cpp | 30 +++++
docs/en/docs/admin-manual/config/be-config.md | 5 +
.../sql-functions/table-functions/local.md | 150 +++++++++++++++++++++
docs/sidebars.json | 1 +
docs/zh-CN/docs/admin-manual/config/be-config.md | 5 +
.../sql-functions/table-functions/local.md | 147 ++++++++++++++++++++
.../doris/analysis/TableValuedFunctionRef.java | 8 +-
.../doris/planner/external/FileQueryScanNode.java | 4 +-
.../apache/doris/planner/external/TVFScanNode.java | 19 +++
.../org/apache/doris/rpc/BackendServiceClient.java | 4 +
.../org/apache/doris/rpc/BackendServiceProxy.java | 12 ++
.../ExternalFileTableValuedFunction.java | 21 +--
.../tablefunction/LocalTableValuedFunction.java | 145 ++++++++++++++++++++
.../doris/tablefunction/TableValuedFunctionIf.java | 2 +
gensrc/proto/internal_service.proto | 14 ++
.../external_table_p0/tvf/test_local_tvf.groovy | 67 +++++++++
23 files changed, 729 insertions(+), 13 deletions(-)
diff --git a/be/src/io/fs/err_utils.cpp b/be/src/io/fs/err_utils.cpp
index 5530b25a0d..648a358850 100644
--- a/be/src/io/fs/err_utils.cpp
+++ b/be/src/io/fs/err_utils.cpp
@@ -53,5 +53,25 @@ std::string hdfs_error() {
return ss.str();
}
+std::string glob_err_to_str(int code) {
+ std::string msg;
+ // https://sites.uclouvain.be/SystInfo/usr/include/glob.h.html
+ switch (code) {
+ case 1:
+ msg = "Ran out of memory";
+ break;
+ case 2:
+ msg = "read error";
+ break;
+ case 3:
+ msg = "No matches found";
+ break;
+ default:
+ msg = "unknown";
+ break;
+ }
+ return fmt::format("({}), {}", code, msg);
+}
+
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/err_utils.h b/be/src/io/fs/err_utils.h
index 31ca702c32..971596fab1 100644
--- a/be/src/io/fs/err_utils.h
+++ b/be/src/io/fs/err_utils.h
@@ -26,6 +26,7 @@ namespace io {
std::string errno_to_str();
std::string errcode_to_str(const std::error_code& ec);
std::string hdfs_error();
+std::string glob_err_to_str(int code);
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/local_file_system.cpp
b/be/src/io/fs/local_file_system.cpp
index 01cd8829dd..48c1981202 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -19,6 +19,7 @@
#include <fcntl.h>
#include <fmt/format.h>
+#include <glob.h>
#include <glog/logging.h>
#include <openssl/md5.h>
#include <sys/mman.h>
@@ -428,5 +429,54 @@ const std::shared_ptr<LocalFileSystem>&
global_local_filesystem() {
return local_fs;
}
+Status LocalFileSystem::canonicalize_local_file(const std::string& dir,
+ const std::string& file_path,
+ std::string* full_path) {
+ const std::string absolute_path = dir + "/" + file_path;
+ std::string canonical_path;
+ RETURN_IF_ERROR(canonicalize(absolute_path, &canonical_path));
+ if (!contain_path(dir, canonical_path)) {
+ return Status::InvalidArgument("file path is not allowed: {}",
canonical_path);
+ }
+
+ *full_path = canonical_path;
+ return Status::OK();
+}
+
+Status LocalFileSystem::safe_glob(const std::string& path,
std::vector<FileInfo>* res) {
+ if (path.find("..") != std::string::npos) {
+ return Status::InvalidArgument("can not contain '..' in path");
+ }
+ std::string full_path = config::user_files_secure_path + "/" + path;
+ std::vector<std::string> files;
+ RETURN_IF_ERROR(_glob(full_path, &files));
+ for (auto& file : files) {
+ FileInfo fi;
+ fi.is_file = true;
+ RETURN_IF_ERROR(canonicalize_local_file("", file, &(fi.file_name)));
+ RETURN_IF_ERROR(file_size_impl(fi.file_name, &(fi.file_size)));
+ res->push_back(std::move(fi));
+ }
+ return Status::OK();
+}
+
+Status LocalFileSystem::_glob(const std::string& pattern,
std::vector<std::string>* res) {
+ glob_t glob_result;
+ memset(&glob_result, 0, sizeof(glob_result));
+
+ int rc = glob(pattern.c_str(), GLOB_TILDE, NULL, &glob_result);
+ if (rc != 0) {
+ globfree(&glob_result);
+ return Status::IOError("failed to glob {}: {}", pattern,
glob_err_to_str(rc));
+ }
+
+ for (size_t i = 0; i < glob_result.gl_pathc; ++i) {
+ res->push_back(std::string(glob_result.gl_pathv[i]));
+ }
+
+ globfree(&glob_result);
+ return Status::OK();
+}
+
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h
index d9c0ec96c8..1f8d35c096 100644
--- a/be/src/io/fs/local_file_system.h
+++ b/be/src/io/fs/local_file_system.h
@@ -72,6 +72,15 @@ public:
// read local file and save content to "content"
Status read_file_to_string(const Path& file, std::string* content);
+ Status canonicalize_local_file(const std::string& dir, const std::string&
file_path,
+ std::string* full_path);
+
+ // glob list the files match the path pattern.
+ // the result will be saved in "res", in absolute path with file size.
+ // "safe" means the path will be concat with the path prefix
config::user_files_secure_path,
+ // so that it can not list any files outside the
config::user_files_secure_path
+ Status safe_glob(const std::string& path, std::vector<FileInfo>* res);
+
protected:
Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
Status open_file_impl(const FileDescription& file_desc, const Path&
abs_path,
@@ -97,6 +106,8 @@ protected:
Status delete_directory_or_file_impl(const Path& path);
private:
+ // a wrapper for glob(), return file list in "res"
+ Status _glob(const std::string& pattern, std::vector<std::string>* res);
LocalFileSystem(Path&& root_path, std::string&& id = "");
};
diff --git a/be/src/olap/tablet_schema_cache.cpp
b/be/src/olap/tablet_schema_cache.cpp
index ee14358495..e14c3f7ecc 100644
--- a/be/src/olap/tablet_schema_cache.cpp
+++ b/be/src/olap/tablet_schema_cache.cpp
@@ -74,7 +74,6 @@ void TabletSchemaCache::_recycle() {
}
}
_is_stopped = true;
- LOG(INFO) << "xxx yyy stopped ";
}
} // namespace doris
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 3a5aa06125..1bbecfca4e 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -55,6 +55,7 @@
#include "common/status.h"
#include "gutil/integral_types.h"
#include "http/http_client.h"
+#include "io/fs/local_file_system.h"
#include "io/fs/stream_load_pipe.h"
#include "io/io_common.h"
#include "olap/data_dir.h"
@@ -1608,4 +1609,25 @@ void
PInternalServiceImpl::get_tablet_rowset_versions(google::protobuf::RpcContr
ExecEnv::GetInstance()->storage_engine()->get_tablet_rowset_versions(request,
response);
}
+void PInternalServiceImpl::glob(google::protobuf::RpcController* controller,
+ const PGlobRequest* request, PGlobResponse*
response,
+ google::protobuf::Closure* done) {
+ bool ret = _heavy_work_pool.try_offer([request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ std::vector<io::FileInfo> files;
+ Status st =
io::global_local_filesystem()->safe_glob(request->pattern(), &files);
+ if (st.ok()) {
+ for (auto& file : files) {
+ PGlobResponse_PFileInfo* pfile = response->add_files();
+ pfile->set_file(file.file_name);
+ pfile->set_size(file.file_size);
+ }
+ }
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ offer_failed(response, done, _heavy_work_pool);
+ }
+}
+
} // namespace doris
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index aa30959ca3..47762cf7e5 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -181,6 +181,9 @@ public:
PGetTabletVersionsResponse* response,
google::protobuf::Closure* done) override;
+ void glob(google::protobuf::RpcController* controller, const PGlobRequest*
request,
+ PGlobResponse* response, google::protobuf::Closure* done)
override;
+
private:
void _exec_plan_fragment_in_pthread(google::protobuf::RpcController*
controller,
const PExecPlanFragmentRequest*
request,
diff --git a/be/test/io/fs/local_file_system_test.cpp
b/be/test/io/fs/local_file_system_test.cpp
index ea452782b2..953d7669b8 100644
--- a/be/test/io/fs/local_file_system_test.cpp
+++ b/be/test/io/fs/local_file_system_test.cpp
@@ -610,4 +610,34 @@ TEST_F(LocalFileSystemTest, TestRandomWrite) {
EXPECT_TRUE(file_reader->close().ok());
}
}
+
+TEST_F(LocalFileSystemTest, TestGlob) {
+ std::string path = "./be/ut_build_ASAN/test/file_path/";
+ EXPECT_TRUE(io::global_local_filesystem()->delete_directory(path).ok());
+ EXPECT_TRUE(io::global_local_filesystem()
+
->create_directory("./be/ut_build_ASAN/test/file_path/1")
+ .ok());
+ EXPECT_TRUE(io::global_local_filesystem()
+
->create_directory("./be/ut_build_ASAN/test/file_path/2")
+ .ok());
+ EXPECT_TRUE(io::global_local_filesystem()
+
->create_directory("./be/ut_build_ASAN/test/file_path/3")
+ .ok());
+
+ save_string_file("./be/ut_build_ASAN/test/file_path/1/f1.txt", "just
test");
+ save_string_file("./be/ut_build_ASAN/test/file_path/1/f2.txt", "just
test");
+ save_string_file("./be/ut_build_ASAN/test/file_path/f3.txt", "just test");
+
+ std::vector<io::FileInfo> files;
+ EXPECT_FALSE(io::global_local_filesystem()->safe_glob("./../*.txt",
&files).ok());
+ EXPECT_FALSE(io::global_local_filesystem()->safe_glob("/*.txt",
&files).ok());
+
EXPECT_TRUE(io::global_local_filesystem()->safe_glob("./file_path/1/*.txt",
&files).ok());
+ EXPECT_EQ(2, files.size());
+ files.clear();
+
EXPECT_TRUE(io::global_local_filesystem()->safe_glob("./file_path/*/*.txt",
&files).ok());
+ EXPECT_EQ(2, files.size());
+
+ EXPECT_TRUE(io::global_local_filesystem()->delete_directory(path).ok());
+}
+
} // namespace doris
diff --git a/docs/en/docs/admin-manual/config/be-config.md
b/docs/en/docs/admin-manual/config/be-config.md
index 601577f252..88a044fdfd 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -1465,3 +1465,8 @@ Indicates how many tablets failed to load in the data
directory. At the same tim
* Description: If true, when the process does not exceed the soft mem limit,
the query memory will not be limited; when the process memory exceeds the soft
mem limit, the query with the largest ratio between the currently used memory
and the exec_mem_limit will be canceled. If false, cancel query when the memory
used exceeds exec_mem_limit.
* Default value: true
+
+#### `user_files_secure_path`
+
+* Description: The storage directory for files queried by `local` table valued
functions.
+* Default value: `${DORIS_HOME}`
\ No newline at end of file
diff --git a/docs/en/docs/sql-manual/sql-functions/table-functions/local.md
b/docs/en/docs/sql-manual/sql-functions/table-functions/local.md
new file mode 100644
index 0000000000..ab3be7d4ae
--- /dev/null
+++ b/docs/en/docs/sql-manual/sql-functions/table-functions/local.md
@@ -0,0 +1,150 @@
+---
+{
+ "title": "local",
+ "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Local
+
+### Name
+
+<version since="dev">
+
+local
+
+</version>
+
+### Description
+
+Local table-valued-function(tvf), allows users to read and access local file
contents on be node, just like accessing relational table. Currently supports
`csv/csv_with_names/csv_with_names_and_types/json/parquet/orc` file format.
+
+It needs `ADMIN` privilege to use.
+
+#### syntax
+
+```sql
+local(
+ "file_path" = "path/to/file.txt",
+ "backend_id" = "be_id",
+ "format" = "csv",
+ "keyn" = "valuen"
+ ...
+ );
+```
+
+**parameter description**
+
+Related parameters for accessing local file on be node:
+
+- `file_path`:
+
+ (required) The path of the file to be read, which is a relative path to
the `user_files_secure_path` directory, where `user_files_secure_path`
parameter [can be configured on be](../../../admin-manual/config/be-config.md).
+
+ Can not contains `..` in path. Support using glob syntax to match multi
files, such as `log/*.log`
+
+- `backend_id`:
+
+ (required) The backend id where the file resides. The `backend_id` can be
obtained by `show backends` command.
+
+File format parameters:
+
+- `format`: (required) Currently support
`csv/csv_with_names/csv_with_names_and_types/json/parquet/orc`
+- `column_separator`: (optional) default `,`.
+- `line_delimiter`: (optional) default `\n`.
+- `compress_type`: (optional) Currently support
`UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`. Default value is `UNKNOWN`, it
will automatically infer the type based on the suffix of `uri`.
+
+ The following 6 parameters are used for loading in json format. For
specific usage methods, please refer to: [Json
Load](../../../data-operate/import/import-way/load-json-format.md)
+
+- `read_json_by_line`: (optional) default `"true"`
+- `strip_outer_array`: (optional) default `"false"`
+- `json_root`: (optional) default `""`
+- `json_paths`: (optional) default `""`
+- `num_as_string`: (optional) default `false`
+- `fuzzy_parse`: (optional) default `false`
+
+ <version since="dev">The following 2 parameters are used for loading in
csv format</version>
+
+- `trim_double_quotes`: Boolean type (optional), the default value is `false`.
True means that the outermost double quotes of each field in the csv file are
trimmed.
+- `skip_lines`: Integer type (optional), the default value is 0. It will skip
some lines in the head of csv file. It will be disabled when the format is
`csv_with_names` or `csv_with_names_and_types`.
+
+### Examples
+
+Analyze the log file on specified BE:
+
+```sql
+mysql> select * from local(
+ "file_path" = "log/be.out",
+ "backend_id" = "10006",
+ "format" = "csv")
+ where c1 like "%start_time%" limit 10;
++--------------------------------------------------------+
+| c1 |
++--------------------------------------------------------+
+| start time: 2023年 08月 07日 星期一 23:20:32 CST |
+| start time: 2023年 08月 07日 星期一 23:32:10 CST |
+| start time: 2023年 08月 08日 星期二 00:20:50 CST |
+| start time: 2023年 08月 08日 星期二 00:29:15 CST |
++--------------------------------------------------------+
+```
+
+Read and access csv format files located at path `${DORIS_HOME}/student.csv`:
+
+```sql
+mysql> select * from local(
+ "file_path" = "student.csv",
+ "backend_id" = "10003",
+ "format" = "csv");
++------+---------+--------+
+| c1 | c2 | c3 |
++------+---------+--------+
+| 1 | alice | 18 |
+| 2 | bob | 20 |
+| 3 | jack | 24 |
+| 4 | jackson | 19 |
+| 5 | liming | d18 |
++------+---------+--------+
+```
+
+Can be used with `desc function` :
+
+```sql
+mysql> desc function local(
+ "file_path" = "student.csv",
+ "backend_id" = "10003",
+ "format" = "csv");
++-------+------+------+-------+---------+-------+
+| Field | Type | Null | Key | Default | Extra |
++-------+------+------+-------+---------+-------+
+| c1 | TEXT | Yes | false | NULL | NONE |
+| c2 | TEXT | Yes | false | NULL | NONE |
+| c3 | TEXT | Yes | false | NULL | NONE |
++-------+------+------+-------+---------+-------+
+```
+
+### Keywords
+
+ local, table-valued-function, tvf
+
+### Best Practice
+
+ For more detailed usage of local tvf, please refer to [S3](./s3.md) tvf, The
only difference between them is the way of accessing the storage system.
diff --git a/docs/sidebars.json b/docs/sidebars.json
index 52757c8f8d..a03377253e 100644
--- a/docs/sidebars.json
+++ b/docs/sidebars.json
@@ -700,6 +700,7 @@
"sql-manual/sql-functions/table-functions/s3",
"sql-manual/sql-functions/table-functions/hdfs",
"sql-manual/sql-functions/table-functions/iceberg_meta",
+
"sql-manual/sql-functions/table-functions/local",
"sql-manual/sql-functions/table-functions/backends",
"sql-manual/sql-functions/table-functions/frontends",
"sql-manual/sql-functions/table-functions/workload-group",
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 291c711470..5e48d603f6 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -1494,3 +1494,8 @@ load tablets from header failed, failed tablets size:
xxx, path=xxx
* 描述: 如果为true,则当内存未超过 exec_mem_limit 时,查询内存将不受限制;当进程内存超过 exec_mem_limit 且大于
2GB 时,查询会被取消。如果为false,则在使用的内存超过 exec_mem_limit 时取消查询。
* 默认值: true
+
+#### `user_files_secure_path`
+
+* 描述: `local` 表函数查询的文件的存储目录。
+* 默认值: `${DORIS_HOME}`
\ No newline at end of file
diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/local.md
b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/local.md
new file mode 100644
index 0000000000..e4ed638017
--- /dev/null
+++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/local.md
@@ -0,0 +1,147 @@
+---
+{
+ "title": "local",
+ "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## local
+
+### Name
+
+<version since="dev">
+
+local
+
+</version>
+
+### Description
+
+Local表函数(table-valued-function,tvf),可以让用户像访问关系表格式数据一样,读取并访问 be
上的文件内容。目前支持`csv/csv_with_names/csv_with_names_and_types/json/parquet/orc`文件格式。
+
+该函数需要 ADMIN 权限。
+
+#### syntax
+```sql
+local(
+ "file_path" = "path/to/file.txt",
+ "backend_id" = "be_id",
+ "format" = "csv",
+ "keyn" = "valuen"
+ ...
+ );
+```
+
+**参数说明**
+
+访问local文件的相关参数:
+- `file_path`
+
+ (必填)待读取文件的路径,该路径是一个相对于 `user_files_secure_path` 目录的相对路径, 其中
`user_files_secure_path` 参数是
[be的一个配置项](../../../admin-manual/config/be-config.md) 。
+
+ 路径中不能包含 `..`,可以使用 glob 语法进行模糊匹配,如:`logs/*.log`
+
+- `backend_id`:
+
+ (必填)文件所在的 be id。 `backend_id` 可以通过 `show backends` 命令得到。
+
+文件格式相关参数
+- `format`:(必填) 目前支持
`csv/csv_with_names/csv_with_names_and_types/json/parquet/orc`
+- `column_separator`:(选填) 列分割符, 默认为`,`。
+- `line_delimiter`:(选填) 行分割符,默认为`\n`。
+- `compress_type`: (选填) 目前支持 `UNKNOWN/PLAIN/GZ/LZO/BZ2/LZ4FRAME/DEFLATE`。 默认值为
`UNKNOWN`, 将会根据 `uri` 的后缀自动推断类型。
+
+ 下面6个参数是用于json格式的导入,具体使用方法可以参照:[Json
Load](../../../data-operate/import/import-way/load-json-format.md)
+
+- `read_json_by_line`: (选填) 默认为 `"true"`
+- `strip_outer_array`: (选填) 默认为 `"false"`
+- `json_root`: (选填) 默认为空
+- `json_paths`: (选填) 默认为空
+- `num_as_string`: (选填) 默认为 `false`
+- `fuzzy_parse`: (选填) 默认为 `false`
+
+ <version since="dev">下面2个参数是用于csv格式的导入</version>
+
+- `trim_double_quotes`: 布尔类型,选填,默认值为 `false`,为 `true` 时表示裁剪掉 csv 文件每个字段最外层的双引号
+- `skip_lines`: 整数类型,选填,默认值为0,含义为跳过csv文件的前几行。当设置format设置为 `csv_with_names` 或
`csv_with_names_and_types` 时,该参数会失效
+
+### Examples
+
+分析指定 BE 上的日志文件:
+
+```sql
+mysql> select * from local(
+ "file_path" = "log/be.out",
+ "backend_id" = "10006",
+ "format" = "csv")
+ where c1 like "%start_time%" limit 10;
++--------------------------------------------------------+
+| c1 |
++--------------------------------------------------------+
+| start time: 2023年 08月 07日 星期一 23:20:32 CST |
+| start time: 2023年 08月 07日 星期一 23:32:10 CST |
+| start time: 2023年 08月 08日 星期二 00:20:50 CST |
+| start time: 2023年 08月 08日 星期二 00:29:15 CST |
++--------------------------------------------------------+
+```
+
+读取和访问位于路径`${DORIS_HOME}/student.csv`的 csv格式文件:
+
+```sql
+mysql> select * from local(
+ "file_path" = "student.csv",
+ "backend_id" = "10003",
+ "format" = "csv");
++------+---------+--------+
+| c1 | c2 | c3 |
++------+---------+--------+
+| 1 | alice | 18 |
+| 2 | bob | 20 |
+| 3 | jack | 24 |
+| 4 | jackson | 19 |
+| 5 | liming | d18 |
++------+---------+--------+
+```
+
+可以配合`desc function`使用
+
+```sql
+mysql> desc function local(
+ "file_path" = "student.csv",
+ "backend_id" = "10003",
+ "format" = "csv");
++-------+------+------+-------+---------+-------+
+| Field | Type | Null | Key | Default | Extra |
++-------+------+------+-------+---------+-------+
+| c1 | TEXT | Yes | false | NULL | NONE |
+| c2 | TEXT | Yes | false | NULL | NONE |
+| c3 | TEXT | Yes | false | NULL | NONE |
++-------+------+------+-------+---------+-------+
+```
+
+### Keywords
+
+ local, table-valued-function, tvf
+
+### Best Practice
+
+ 关于local tvf的更详细使用方法可以参照 [S3](./s3.md) tvf, 唯一不同的是访问存储系统的方式不一样。
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
index ba1b07eb4c..166b3297ea 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
@@ -27,6 +27,7 @@ import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.tablefunction.BackendsTableValuedFunction;
+import org.apache.doris.tablefunction.LocalTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import java.util.Map;
@@ -103,11 +104,12 @@ public class TableValuedFunctionRef extends TableRef {
return;
}
- // check privilige for backends tvf
- if (funcName.equalsIgnoreCase(BackendsTableValuedFunction.NAME)) {
+ // check privilige for backends/local tvf
+ if (funcName.equalsIgnoreCase(BackendsTableValuedFunction.NAME)
+ || funcName.equalsIgnoreCase(LocalTableValuedFunction.NAME)) {
if
(!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)
&&
!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(),
-
PrivPredicate.OPERATOR)) {
+ PrivPredicate.OPERATOR)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN/OPERATOR");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 586baa58d0..db8f835abb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -364,7 +364,8 @@ public abstract class FileQueryScanNode extends
FileScanNode {
params.addToBrokerAddresses(new
TNetworkAddress(broker.host, broker.port));
}
}
- } else if (locationType == TFileType.FILE_S3 &&
!params.isSetProperties()) {
+ } else if ((locationType == TFileType.FILE_S3 || locationType ==
TFileType.FILE_LOCAL)
+ && !params.isSetProperties()) {
params.setProperties(locationProperties);
}
@@ -405,6 +406,7 @@ public abstract class FileQueryScanNode extends
FileScanNode {
rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
} else if (locationType == TFileType.FILE_S3
|| locationType == TFileType.FILE_BROKER
+ || locationType == TFileType.FILE_LOCAL
|| locationType == TFileType.FILE_NET) {
// need full path
rangeDesc.setPath(fileSplit.getPath().toString());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
index 476e16b098..d895856401 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
@@ -18,6 +18,7 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FunctionGenTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
@@ -27,7 +28,9 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.system.Backend;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.LocalTableValuedFunction;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileCompressType;
@@ -40,6 +43,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -62,6 +66,21 @@ public class TVFScanNode extends FileQueryScanNode {
}
@Override
+ protected void initBackendPolicy() throws UserException {
+ List<String> preferLocations = new ArrayList<>();
+ if (tableValuedFunction instanceof LocalTableValuedFunction) {
+ // For local tvf, the backend was specified by backendId
+ Long backendId = ((LocalTableValuedFunction)
tableValuedFunction).getBackendId();
+ Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
+ if (backend == null) {
+ throw new UserException("Backend " + backendId + " does not
exist");
+ }
+ preferLocations.add(backend.getHost());
+ }
+ backendPolicy.init(preferLocations);
+ numNodes = backendPolicy.numBackends();
+ }
+
protected String getFsName(FileSplit split) {
return tableValuedFunction.getFsName();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 484f6e77ed..8df33927fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -142,6 +142,10 @@ public class BackendServiceClient {
return stub.getColumnIdsByTabletIds(request);
}
+ public Future<InternalService.PGlobResponse>
glob(InternalService.PGlobRequest request) {
+ return stub.glob(request);
+ }
+
public void shutdown() {
if (!channel.isShutdown()) {
channel.shutdown();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 39dfd7915f..5fc3ef2815 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -387,4 +387,16 @@ public class BackendServiceProxy {
}
}
+ public Future<InternalService.PGlobResponse> glob(TNetworkAddress address,
+ InternalService.PGlobRequest request) throws RpcException {
+ try {
+ final BackendServiceClient client = getProxy(address);
+ return client.glob(request);
+ } catch (Throwable e) {
+ LOG.warn("failed to glob dir from BE {}:{}, path: {}, error: ",
+ address.getHostname(), address.getPort(),
request.getPattern());
+ throw new RpcException(address.hostname, e.getMessage());
+ }
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 7e1a4b6698..035d54c0d1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PrimitiveType;
@@ -347,18 +348,13 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
return columns;
}
// get one BE address
- TNetworkAddress address = null;
columns = Lists.newArrayList();
- for (Backend be :
org.apache.doris.catalog.Env.getCurrentSystemInfo().getIdToBackend().values()) {
- if (be.isAlive()) {
- address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
- break;
- }
- }
- if (address == null) {
+ Backend be = getBackend();
+ if (be == null) {
throw new AnalysisException("No Alive backends");
}
+ TNetworkAddress address = new TNetworkAddress(be.getHost(),
be.getBrpcPort());
try {
PFetchTableSchemaRequest request = getFetchTableStructureRequest();
Future<InternalService.PFetchTableSchemaResult> future =
BackendServiceProxy.getInstance()
@@ -390,6 +386,15 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
return columns;
}
+ protected Backend getBackend() {
+ for (Backend be :
Env.getCurrentSystemInfo().getIdToBackend().values()) {
+ if (be.isAlive()) {
+ return be;
+ }
+ }
+ return null;
+ }
+
/**
* Convert PTypeDesc into doris column type
* @param typeNodes list PTypeNodes in PTypeDesc
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java
new file mode 100644
index 0000000000..f6693317ba
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PGlobResponse;
+import org.apache.doris.proto.InternalService.PGlobResponse.PFileInfo;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.map.CaseInsensitiveMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The implement of table valued function
+ * local("file_path" = "path/to/file.txt", "backend_id" = "be_id").
+ */
+public class LocalTableValuedFunction extends ExternalFileTableValuedFunction {
+ private static final Logger LOG =
LogManager.getLogger(LocalTableValuedFunction.class);
+
+ public static final String NAME = "local";
+ public static final String FILE_PATH = "file_path";
+ public static final String BACKEND_ID = "backend_id";
+
+ private static final ImmutableSet<String> LOCATION_PROPERTIES = new
ImmutableSet.Builder<String>()
+ .add(FILE_PATH)
+ .add(BACKEND_ID)
+ .build();
+
+ private String filePath;
+ private long backendId;
+
+ public LocalTableValuedFunction(Map<String, String> params) throws
AnalysisException {
+ Map<String, String> fileFormatParams = new CaseInsensitiveMap();
+ locationProperties = Maps.newHashMap();
+ for (String key : params.keySet()) {
+ if (FILE_FORMAT_PROPERTIES.contains(key.toLowerCase())) {
+ fileFormatParams.put(key, params.get(key));
+ } else if (LOCATION_PROPERTIES.contains(key.toLowerCase())) {
+ locationProperties.put(key.toLowerCase(), params.get(key));
+ } else {
+ throw new AnalysisException(key + " is invalid property");
+ }
+ }
+
+ if (!locationProperties.containsKey(FILE_PATH)) {
+ throw new AnalysisException(String.format("Configuration '%s' is
required.", FILE_PATH));
+ }
+ if (!locationProperties.containsKey(BACKEND_ID)) {
+ throw new AnalysisException(String.format("Configuration '%s' is
required.", BACKEND_ID));
+ }
+
+ filePath = locationProperties.get(FILE_PATH);
+ backendId = Long.parseLong(locationProperties.get(BACKEND_ID));
+ parseProperties(fileFormatParams);
+
+ getFileListFromBackend();
+ }
+
+ private void getFileListFromBackend() throws AnalysisException {
+ Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
+ if (be == null) {
+ throw new AnalysisException("backend not found with backend_id = "
+ backendId);
+ }
+
+ BackendServiceProxy proxy = BackendServiceProxy.getInstance();
+ TNetworkAddress address = be.getBrpcAdress();
+ InternalService.PGlobRequest.Builder requestBuilder =
InternalService.PGlobRequest.newBuilder();
+ requestBuilder.setPattern(filePath);
+ try {
+ Future<PGlobResponse> response = proxy.glob(address,
requestBuilder.build());
+ PGlobResponse globResponse = response.get(5, TimeUnit.SECONDS);
+ if (globResponse.getStatus().getStatusCode() != 0) {
+ throw new AnalysisException(
+ "error code: " +
globResponse.getStatus().getStatusCode()
+ + ", error msg: " +
globResponse.getStatus().getErrorMsgsList());
+ }
+ for (PFileInfo file : globResponse.getFilesList()) {
+ fileStatuses.add(new TBrokerFileStatus(file.getFile().trim(),
false, file.getSize(), true));
+ LOG.info("get file from backend success. file: {}, size: {}",
file.getFile(), file.getSize());
+ }
+ } catch (Exception e) {
+ throw new AnalysisException("get file list from backend failed. "
+ e.getMessage());
+ }
+ }
+
+ @Override
+ public TFileType getTFileType() {
+ return TFileType.FILE_LOCAL;
+ }
+
+ @Override
+ public String getFilePath() {
+ return filePath;
+ }
+
+ @Override
+ public BrokerDesc getBrokerDesc() {
+ return new BrokerDesc("LocalTvfBroker", StorageType.LOCAL,
locationProperties);
+ }
+
+ @Override
+ public String getTableName() {
+ return "LocalTableValuedFunction";
+ }
+
+ public Long getBackendId() {
+ return backendId;
+ }
+
+ @Override
+ protected Backend getBackend() {
+ return Env.getCurrentSystemInfo().getBackend(backendId);
+ }
+}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index ea135b8b1b..2c67178494 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -49,6 +49,8 @@ public abstract class TableValuedFunctionIf {
return new S3TableValuedFunction(params);
case HdfsTableValuedFunction.NAME:
return new HdfsTableValuedFunction(params);
+ case LocalTableValuedFunction.NAME:
+ return new LocalTableValuedFunction(params);
case IcebergTableValuedFunction.NAME:
return new IcebergTableValuedFunction(params);
case BackendsTableValuedFunction.NAME:
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index de1f7876dc..8b698b9948 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -670,6 +670,19 @@ message PGetTabletVersionsResponse {
repeated PVersion versions = 2;
};
+message PGlobRequest {
+ optional string pattern = 1;
+}
+
+message PGlobResponse {
+ message PFileInfo {
+ optional string file = 1;
+ optional int64 size = 2;
+ };
+ required PStatus status = 1;
+ repeated PFileInfo files = 2;
+}
+
service PBackendService {
rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
@@ -707,5 +720,6 @@ service PBackendService {
rpc tablet_fetch_data(PTabletKeyLookupRequest) returns
(PTabletKeyLookupResponse);
rpc get_column_ids_by_tablet_ids(PFetchColIdsRequest) returns
(PFetchColIdsResponse);
rpc get_tablet_rowset_versions(PGetTabletVersionsRequest) returns
(PGetTabletVersionsResponse);
+ rpc glob(PGlobRequest) returns (PGlobResponse);
};
diff --git a/regression-test/suites/external_table_p0/tvf/test_local_tvf.groovy
b/regression-test/suites/external_table_p0/tvf/test_local_tvf.groovy
new file mode 100644
index 0000000000..48782c4ad6
--- /dev/null
+++ b/regression-test/suites/external_table_p0/tvf/test_local_tvf.groovy
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This suit test the `backends` tvf
+suite("test_local_tvf") {
+ List<List<Object>> table = sql """ select * from backends(); """
+ assertTrue(table.size() > 0)
+ def be_id = table[0][0]
+
+ table = sql """
+ select count(*) from local(
+ "file_path" = "log/be.out",
+ "backend_id" = "${be_id}",
+ "format" = "csv")
+ where c1 like "%start_time%";"""
+
+ assertTrue(table.size() > 0)
+ assertTrue(Long.valueOf(table[0][0]) > 0)
+
+ table = sql """
+ select count(*) from local(
+ "file_path" = "log/*.out",
+ "backend_id" = "${be_id}",
+ "format" = "csv")
+ where c1 like "%start_time%";"""
+
+ assertTrue(table.size() > 0)
+ assertTrue(Long.valueOf(table[0][0]) > 0)
+
+ test {
+ sql """
+ select count(*) from local(
+ "file_path" = "../log/be.out",
+ "backend_id" = "${be_id}",
+ "format" = "csv")
+ where c1 like "%start_time%";
+ """
+ // check exception message contains
+ exception "can not contain '..' in path"
+ }
+
+ test {
+ sql """
+ select count(*) from local(
+ "file_path" = "./log/xx.out",
+ "backend_id" = "${be_id}",
+ "format" = "csv")
+ where c1 like "%start_time%";
+ """
+ // check exception message contains
+ exception "No matches found"
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]