This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 13fb69550a [improvement](kerberos) disable hdfs fs handle cache to
renew kerberos ticket at fix interval (#21265)
13fb69550a is described below
commit 13fb69550a18c99d4bfb64cc001a024f3053683b
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Jul 4 17:13:34 2023 +0800
[improvement](kerberos) disable hdfs fs handle cache to renew kerberos
ticket at fix interval (#21265)
Add a new BE config `kerberos_ticket_lifetime_seconds`, default is 86400.
Better set it same as the value of `ticket_lifetime` in `krb5.conf`
If a HDFS fs handle in cache is live longer than HALF of this time, it will
be set as invalid and recreated.
And the kerberos ticket will be renewed.
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 4 ++++
be/src/io/fs/hdfs_file_system.cpp | 49 +++++++++++++++++++++------------------
be/src/io/fs/hdfs_file_system.h | 26 +++++++++++++++++----
be/src/io/hdfs_builder.cpp | 1 +
5 files changed, 54 insertions(+), 27 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 24c5d5f32a..7186d8c970 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1028,6 +1028,7 @@ DEFINE_Int64(max_external_file_meta_cache_num, "20000");
DEFINE_Int32(rocksdb_max_write_buffer_number, "5");
DEFINE_Bool(allow_invalid_decimalv2_literal, "false");
+DEFINE_mInt64(kerberos_expiration_time_seconds, "43200");
#ifdef BE_TEST
// test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7601eade16..6230e4caf2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1045,6 +1045,10 @@ DECLARE_Int32(rocksdb_max_write_buffer_number);
// Allow invalid decimalv2 literal for compatible with old version. Recommend
set it false strongly.
DECLARE_mBool(allow_invalid_decimalv2_literal);
+// the max expiration time of kerberos ticket.
+// If a hdfs filesytem with kerberos authentication live longer
+// than this time, it will be expired.
+DECLARE_mInt64(kerberos_expiration_time_seconds);
#ifdef BE_TEST
// test s3
diff --git a/be/src/io/fs/hdfs_file_system.cpp
b/be/src/io/fs/hdfs_file_system.cpp
index 745a7736f9..775754bd4d 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -78,7 +78,7 @@ private:
HdfsFileSystemCache() = default;
uint64 _hdfs_hash_code(const THdfsParams& hdfs_params);
- Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs);
+ Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs, bool*
is_kerberos);
void _clean_invalid();
void _clean_oldest();
};
@@ -423,9 +423,11 @@ HdfsFileSystemHandle* HdfsFileSystem::get_handle() {
// ************* HdfsFileSystemCache ******************
int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64;
-Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS*
fs) {
+Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS*
fs,
+ bool* is_kerberos) {
HDFSCommonBuilder builder;
RETURN_IF_ERROR(createHDFSBuilder(hdfs_params, &builder));
+ *is_kerberos = builder.is_need_kinit();
hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get());
if (hdfs_fs == nullptr) {
return Status::IOError("faield to connect to hdfs {}: {}",
hdfs_params.fs_name,
@@ -467,30 +469,33 @@ Status HdfsFileSystemCache::get_connection(const
THdfsParams& hdfs_params,
auto it = _cache.find(hash_code);
if (it != _cache.end()) {
HdfsFileSystemHandle* handle = it->second.get();
- if (handle->invalid()) {
- hdfsFS hdfs_fs = nullptr;
- RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs));
- *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false);
- } else {
+ if (!handle->invalid()) {
handle->inc_ref();
*fs_handle = handle;
+ return Status::OK();
}
+ // fs handle is invalid, erase it.
+ _cache.erase(it);
+ LOG(INFO) << "erase the hdfs handle, fs name: " <<
hdfs_params.fs_name;
+ }
+
+ // not find in cache, or fs handle is invalid
+ // create a new one and try to put it into cache
+ hdfsFS hdfs_fs = nullptr;
+ bool is_kerberos = false;
+ RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs, &is_kerberos));
+ if (_cache.size() >= MAX_CACHE_HANDLE) {
+ _clean_invalid();
+ _clean_oldest();
+ }
+ if (_cache.size() < MAX_CACHE_HANDLE) {
+ std::unique_ptr<HdfsFileSystemHandle> handle =
+ std::make_unique<HdfsFileSystemHandle>(hdfs_fs, true,
is_kerberos);
+ handle->inc_ref();
+ *fs_handle = handle.get();
+ _cache[hash_code] = std::move(handle);
} else {
- hdfsFS hdfs_fs = nullptr;
- RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs));
- if (_cache.size() >= MAX_CACHE_HANDLE) {
- _clean_invalid();
- _clean_oldest();
- }
- if (_cache.size() < MAX_CACHE_HANDLE) {
- std::unique_ptr<HdfsFileSystemHandle> handle =
- std::make_unique<HdfsFileSystemHandle>(hdfs_fs, true);
- handle->inc_ref();
- *fs_handle = handle.get();
- _cache[hash_code] = std::move(handle);
- } else {
- *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false);
- }
+ *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false, is_kerberos);
}
}
return Status::OK();
diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h
index d542cd1ba7..bd28ec73c2 100644
--- a/be/src/io/fs/hdfs_file_system.h
+++ b/be/src/io/fs/hdfs_file_system.h
@@ -27,6 +27,7 @@
#include <string>
#include <vector>
+#include "common/config.h"
#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/hdfs.h"
@@ -41,14 +42,21 @@ struct FileInfo;
class HdfsFileSystemHandle {
public:
- HdfsFileSystemHandle(hdfsFS fs, bool cached)
- : hdfs_fs(fs), from_cache(cached), _ref_cnt(0),
_last_access_time(0), _invalid(false) {}
+ HdfsFileSystemHandle(hdfsFS fs, bool cached, bool is_kerberos)
+ : hdfs_fs(fs),
+ from_cache(cached),
+ _is_kerberos(is_kerberos),
+ _ref_cnt(0),
+ _create_time(_now()),
+ _last_access_time(0),
+ _invalid(false) {}
~HdfsFileSystemHandle() {
DCHECK(_ref_cnt == 0);
if (hdfs_fs != nullptr) {
- // Even if there is an error, the resources associated with the
hdfsFS will be freed.
- hdfsDisconnect(hdfs_fs);
+ // DO NOT call hdfsDisconnect(), or we will meet "Filesystem
closed"
+ // even if we create a new one
+ // hdfsDisconnect(hdfs_fs);
}
hdfs_fs = nullptr;
}
@@ -67,7 +75,11 @@ public:
int ref_cnt() { return _ref_cnt; }
- bool invalid() { return _invalid; }
+ bool invalid() {
+ return _invalid ||
+ (_is_kerberos &&
+ _now() - _create_time.load() >
config::kerberos_expiration_time_seconds * 1000);
+ }
void set_invalid() { _invalid = true; }
@@ -77,8 +89,12 @@ public:
const bool from_cache;
private:
+ const bool _is_kerberos;
// the number of referenced client
std::atomic<int> _ref_cnt;
+ // For kerberos authentication, we need to save create time so that
+ // we can know if the kerberos ticket is expired.
+ std::atomic<uint64_t> _create_time;
// HdfsFileSystemCache try to remove the oldest handler when the cache is
full
std::atomic<uint64_t> _last_access_time;
// Client will set invalid if error thrown, and HdfsFileSystemCache will
not reuse this handler
diff --git a/be/src/io/hdfs_builder.cpp b/be/src/io/hdfs_builder.cpp
index 73edc326c3..19986f76e4 100644
--- a/be/src/io/hdfs_builder.cpp
+++ b/be/src/io/hdfs_builder.cpp
@@ -72,6 +72,7 @@ Status HDFSCommonBuilder::run_kinit() {
#endif
hdfsBuilderConfSetStr(hdfs_builder,
"hadoop.security.kerberos.ticket.cache.path",
ticket_path.c_str());
+ LOG(INFO) << "finished to run kinit command: " <<
fmt::to_string(kinit_command);
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]