This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new b7680fd95e2 [fix](broker-read) refactor broker reading process to
avoid null broker connection (#26050) (#26349)
b7680fd95e2 is described below
commit b7680fd95e2886737b1a08ccbeec525c38e36d56
Author: Siyang Tang <[email protected]>
AuthorDate: Fri Nov 3 12:28:45 2023 +0800
[fix](broker-read) refactor broker reading process to avoid null broker
connection (#26050) (#26349)
---
be/src/io/fs/broker_file_reader.cpp | 81 +++-------------------
be/src/io/fs/broker_file_reader.h | 11 ++-
be/src/io/fs/broker_file_system.cpp | 135 ++++++++++++++++++++++++++----------
be/src/io/fs/broker_file_system.h | 7 +-
be/src/runtime/client_cache.h | 2 +
5 files changed, 121 insertions(+), 115 deletions(-)
diff --git a/be/src/io/fs/broker_file_reader.cpp
b/be/src/io/fs/broker_file_reader.cpp
index c48433ba23a..fa21e754342 100644
--- a/be/src/io/fs/broker_file_reader.cpp
+++ b/be/src/io/fs/broker_file_reader.cpp
@@ -35,19 +35,16 @@
#include "io/fs/broker_file_system.h"
#include "util/doris_metrics.h"
-namespace doris {
-namespace io {
+namespace doris::io {
class IOContext;
-BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, const
Path& path,
- size_t file_size, TBrokerFD fd,
- std::shared_ptr<BrokerFileSystem> fs)
- : _path(path),
+BrokerFileReader::BrokerFileReader(const TNetworkAddress& broker_addr, Path
path, size_t file_size,
+ TBrokerFD fd,
std::shared_ptr<BrokerFileSystem> fs)
+ : _path(std::move(path)),
_file_size(file_size),
_broker_addr(broker_addr),
_fd(fd),
_fs(std::move(fs)) {
- _fs->get_client(&_client);
DorisMetrics::instance()->broker_file_open_reading->increment(1);
DorisMetrics::instance()->broker_file_reader_total->increment(1);
}
@@ -59,32 +56,7 @@ BrokerFileReader::~BrokerFileReader() {
Status BrokerFileReader::close() {
bool expected = false;
if (_closed.compare_exchange_strong(expected, true,
std::memory_order_acq_rel)) {
- TBrokerCloseReaderRequest request;
- request.__set_version(TBrokerVersion::VERSION_ONE);
- request.__set_fd(_fd);
-
- TBrokerOperationStatus response;
- try {
- try {
- (*_client)->closeReader(response, request);
- } catch (apache::thrift::transport::TTransportException&) {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- RETURN_IF_ERROR((*_client).reopen());
- (*_client)->closeReader(response, request);
- }
- } catch (apache::thrift::TException& e) {
- std::stringstream ss;
- ss << "Close broker reader failed, broker:" << _broker_addr << "
failed:" << e.what();
- return Status::RpcError(ss.str());
- }
-
- if (response.statusCode != TBrokerOperationStatusCode::OK) {
- std::stringstream ss;
- ss << "close broker reader failed, broker:" << _broker_addr
- << " failed:" << response.message;
- return Status::InternalError(ss.str());
- }
-
+ RETURN_IF_ERROR(_fs->close_file(_fd));
DorisMetrics::instance()->broker_file_open_reading->increment(-1);
}
return Status::OK();
@@ -100,45 +72,12 @@ Status BrokerFileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes
return Status::OK();
}
- TBrokerPReadRequest request;
- request.__set_version(TBrokerVersion::VERSION_ONE);
- request.__set_fd(_fd);
- request.__set_offset(offset);
- request.__set_length(bytes_req);
-
- TBrokerReadResponse response;
- try {
- VLOG_RPC << "send pread request to broker:" << _broker_addr << "
position:" << offset
- << ", read bytes length:" << bytes_req;
- try {
- (*_client)->pread(response, request);
- } catch (apache::thrift::transport::TTransportException& e) {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- RETURN_IF_ERROR((*_client).reopen());
- LOG(INFO) << "retry reading from broker: " << _broker_addr << ".
reason: " << e.what();
- (*_client)->pread(response, request);
- }
- } catch (apache::thrift::TException& e) {
- std::stringstream ss;
- ss << "Open broker reader failed, broker:" << _broker_addr << "
failed:" << e.what();
- return Status::RpcError(ss.str());
- }
-
- if (response.opStatus.statusCode ==
TBrokerOperationStatusCode::END_OF_FILE) {
- // read the end of broker's file
- *bytes_read = 0;
- return Status::OK();
- } else if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK)
{
- std::stringstream ss;
- ss << "Open broker reader failed, broker:" << _broker_addr
- << " failed:" << response.opStatus.message;
- return Status::InternalError(ss.str());
- }
+ std::string data;
+ RETURN_IF_ERROR(_fs->read_file(_fd, offset, bytes_req, &data));
- *bytes_read = response.data.size();
- memcpy(to, response.data.data(), *bytes_read);
+ *bytes_read = data.size();
+ memcpy(to, data.data(), *bytes_read);
return Status::OK();
}
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/io/fs/broker_file_reader.h
b/be/src/io/fs/broker_file_reader.h
index a7f126ba0db..3f51fb979c3 100644
--- a/be/src/io/fs/broker_file_reader.h
+++ b/be/src/io/fs/broker_file_reader.h
@@ -32,15 +32,14 @@
#include "runtime/client_cache.h"
#include "util/slice.h"
-namespace doris {
-namespace io {
+namespace doris::io {
class IOContext;
class BrokerFileReader : public FileReader {
public:
- BrokerFileReader(const TNetworkAddress& broker_addr, const Path& path,
size_t file_size,
- TBrokerFD fd, std::shared_ptr<BrokerFileSystem> fs);
+ BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t
file_size, TBrokerFD fd,
+ std::shared_ptr<BrokerFileSystem> fs);
~BrokerFileReader() override;
@@ -66,8 +65,6 @@ private:
TBrokerFD _fd;
std::shared_ptr<BrokerFileSystem> _fs;
- std::shared_ptr<BrokerServiceConnection> _client;
std::atomic<bool> _closed = false;
};
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/io/fs/broker_file_system.cpp
b/be/src/io/fs/broker_file_system.cpp
index 3578fb91eaa..a3c93c04a7a 100644
--- a/be/src/io/fs/broker_file_system.cpp
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -45,8 +45,7 @@
#include "runtime/exec_env.h"
#include "util/slice.h"
-namespace doris {
-namespace io {
+namespace doris::io {
#ifdef BE_TEST
inline BrokerServiceClientCache* client_cache() {
@@ -70,8 +69,8 @@ inline const std::string& client_id(const TNetworkAddress&
addr) {
#ifndef CHECK_BROKER_CLIENT
#define CHECK_BROKER_CLIENT(client) \
- if (!client) { \
- return Status::IOError("init Broker client error"); \
+ if (!client || !client->is_alive()) { \
+ return Status::IOError("connect to broker failed"); \
}
#endif
@@ -90,8 +89,8 @@ BrokerFileSystem::BrokerFileSystem(const TNetworkAddress&
broker_addr,
Status BrokerFileSystem::connect_impl() {
Status status = Status::OK();
- _client.reset(new BrokerServiceConnection(client_cache(), _broker_addr,
- config::thrift_rpc_timeout_ms,
&status));
+ _connection = std::make_unique<BrokerServiceConnection>(client_cache(),
_broker_addr,
+
config::thrift_rpc_timeout_ms, &status);
return status;
}
@@ -109,7 +108,7 @@ Status BrokerFileSystem::open_file_internal(const
FileDescription& fd, const Pat
RETURN_IF_ERROR(file_size_impl(abs_path, &fsize));
}
- CHECK_BROKER_CLIENT(_client);
+ CHECK_BROKER_CLIENT(_connection);
TBrokerOpenReaderRequest request;
request.__set_version(TBrokerVersion::VERSION_ONE);
request.__set_path(abs_path);
@@ -121,11 +120,11 @@ Status BrokerFileSystem::open_file_internal(const
FileDescription& fd, const Pat
try {
Status status;
try {
- (*_client)->openReader(*response, request);
+ (*_connection)->openReader(*response, request);
} catch (apache::thrift::transport::TTransportException&) {
std::this_thread::sleep_for(std::chrono::seconds(1));
- RETURN_IF_ERROR((*_client).reopen());
- (*_client)->openReader(*response, request);
+ RETURN_IF_ERROR((*_connection).reopen());
+ (*_connection)->openReader(*response, request);
}
} catch (apache::thrift::TException& e) {
return Status::RpcError("failed to open file {}: {}",
abs_path.native(),
@@ -147,7 +146,7 @@ Status BrokerFileSystem::create_directory_impl(const Path&
/*path*/, bool /*fail
}
Status BrokerFileSystem::delete_file_impl(const Path& file) {
- CHECK_BROKER_CLIENT(_client);
+ CHECK_BROKER_CLIENT(_connection);
try {
// rm file from remote path
TBrokerDeletePathRequest del_req;
@@ -157,10 +156,10 @@ Status BrokerFileSystem::delete_file_impl(const Path&
file) {
del_req.__set_properties(_broker_prop);
try {
- (*_client)->deletePath(del_rep, del_req);
+ (*_connection)->deletePath(del_rep, del_req);
} catch (apache::thrift::transport::TTransportException&) {
- RETURN_IF_ERROR((*_client).reopen());
- (*_client)->deletePath(del_rep, del_req);
+ RETURN_IF_ERROR((*_connection).reopen());
+ (*_connection)->deletePath(del_rep, del_req);
}
if (del_rep.statusCode == TBrokerOperationStatusCode::OK) {
@@ -187,7 +186,7 @@ Status BrokerFileSystem::batch_delete_impl(const
std::vector<Path>& files) {
}
Status BrokerFileSystem::exists_impl(const Path& path, bool* res) const {
- CHECK_BROKER_CLIENT(_client);
+ CHECK_BROKER_CLIENT(_connection);
*res = false;
try {
TBrokerCheckPathExistRequest check_req;
@@ -197,10 +196,10 @@ Status BrokerFileSystem::exists_impl(const Path& path,
bool* res) const {
check_req.__set_properties(_broker_prop);
try {
- (*_client)->checkPathExist(check_rep, check_req);
+ (*_connection)->checkPathExist(check_rep, check_req);
} catch (apache::thrift::transport::TTransportException&) {
- RETURN_IF_ERROR((*_client).reopen());
- (*_client)->checkPathExist(check_rep, check_req);
+ RETURN_IF_ERROR((*_connection).reopen());
+ (*_connection)->checkPathExist(check_rep, check_req);
}
if (check_rep.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
@@ -220,7 +219,7 @@ Status BrokerFileSystem::exists_impl(const Path& path,
bool* res) const {
}
Status BrokerFileSystem::file_size_impl(const Path& path, int64_t* file_size)
const {
- CHECK_BROKER_CLIENT(_client);
+ CHECK_BROKER_CLIENT(_connection);
try {
TBrokerFileSizeRequest req;
req.__set_version(TBrokerVersion::VERSION_ONE);
@@ -229,10 +228,10 @@ Status BrokerFileSystem::file_size_impl(const Path& path,
int64_t* file_size) co
TBrokerFileSizeResponse resp;
try {
- (*_client)->fileSize(resp, req);
+ (*_connection)->fileSize(resp, req);
} catch (apache::thrift::transport::TTransportException&) {
- RETURN_IF_ERROR((*_client).reopen());
- (*_client)->fileSize(resp, req);
+ RETURN_IF_ERROR((*_connection).reopen());
+ (*_connection)->fileSize(resp, req);
}
if (resp.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
@@ -257,7 +256,7 @@ Status BrokerFileSystem::list_impl(const Path& dir, bool
only_file, std::vector<
if (!(*exists)) {
return Status::OK();
}
- CHECK_BROKER_CLIENT(_client);
+ CHECK_BROKER_CLIENT(_connection);
Status status = Status::OK();
try {
// get existing files from remote path
@@ -270,10 +269,10 @@ Status BrokerFileSystem::list_impl(const Path& dir, bool
only_file, std::vector<
list_req.__set_fileNameOnly(true); // we only need file name, not abs
path
try {
- (*_client)->listPath(list_rep, list_req);
+ (*_connection)->listPath(list_rep, list_req);
} catch (apache::thrift::transport::TTransportException&) {
- RETURN_IF_ERROR((*_client).reopen());
- (*_client)->listPath(list_rep, list_req);
+ RETURN_IF_ERROR((*_connection).reopen());
+ (*_connection)->listPath(list_rep, list_req);
}
if (list_rep.opStatus.statusCode ==
TBrokerOperationStatusCode::FILE_NOT_FOUND) {
@@ -310,7 +309,7 @@ Status BrokerFileSystem::list_impl(const Path& dir, bool
only_file, std::vector<
}
Status BrokerFileSystem::rename_impl(const Path& orig_name, const Path&
new_name) {
- CHECK_BROKER_CLIENT(_client);
+ CHECK_BROKER_CLIENT(_connection);
try {
TBrokerOperationStatus op_status;
TBrokerRenamePathRequest rename_req;
@@ -320,10 +319,10 @@ Status BrokerFileSystem::rename_impl(const Path&
orig_name, const Path& new_name
rename_req.__set_properties(_broker_prop);
try {
- (*_client)->renamePath(op_status, rename_req);
+ (*_connection)->renamePath(op_status, rename_req);
} catch (apache::thrift::transport::TTransportException&) {
- RETURN_IF_ERROR((*_client).reopen());
- (*_client)->renamePath(op_status, rename_req);
+ RETURN_IF_ERROR((*_connection).reopen());
+ (*_connection)->renamePath(op_status, rename_req);
}
if (op_status.statusCode != TBrokerOperationStatusCode::OK) {
@@ -470,9 +469,76 @@ Status BrokerFileSystem::direct_download_impl(const Path&
remote_file, std::stri
return Status::OK();
}
-Status BrokerFileSystem::get_client(std::shared_ptr<BrokerServiceConnection>*
client) const {
- CHECK_BROKER_CLIENT(_client);
- *client = _client;
+Status BrokerFileSystem::read_file(const TBrokerFD& fd, size_t offset, size_t
bytes_req,
+ std::string* data) const {
+ if (data == nullptr) {
+ return Status::InvalidArgument("data should be not null");
+ }
+ CHECK_BROKER_CLIENT(_connection);
+ TBrokerPReadRequest request;
+ request.__set_version(TBrokerVersion::VERSION_ONE);
+ request.__set_fd(fd);
+ request.__set_offset(offset);
+ request.__set_length(bytes_req);
+
+ TBrokerReadResponse response;
+ try {
+ VLOG_RPC << "send pread request to broker:" << _broker_addr << "
position:" << offset
+ << ", read bytes length:" << bytes_req;
+ try {
+ (*_connection)->pread(response, request);
+ } catch (apache::thrift::transport::TTransportException& e) {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ RETURN_IF_ERROR((*_connection).reopen());
+ LOG(INFO) << "retry reading from broker: " << _broker_addr << ".
reason: " << e.what();
+ (*_connection)->pread(response, request);
+ }
+ } catch (apache::thrift::TException& e) {
+ std::stringstream ss;
+ ss << "read broker file failed, broker:" << _broker_addr << " failed:"
<< e.what();
+ return Status::RpcError(ss.str());
+ }
+
+ if (response.opStatus.statusCode ==
TBrokerOperationStatusCode::END_OF_FILE) {
+ // read the end of broker's file
+ return Status::OK();
+ }
+ if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) {
+ std::stringstream ss;
+ ss << "Open broker reader failed, broker:" << _broker_addr
+ << " failed:" << response.opStatus.message;
+ return Status::InternalError(ss.str());
+ }
+ *data = std::move(response.data);
+ return Status::OK();
+}
+
+Status BrokerFileSystem::close_file(const TBrokerFD& fd) const {
+ CHECK_BROKER_CLIENT(_connection);
+ TBrokerCloseReaderRequest request;
+ request.__set_version(TBrokerVersion::VERSION_ONE);
+ request.__set_fd(fd);
+
+ TBrokerOperationStatus response;
+ try {
+ try {
+ (*_connection)->closeReader(response, request);
+ } catch (apache::thrift::transport::TTransportException&) {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ RETURN_IF_ERROR((*_connection).reopen());
+ (*_connection)->closeReader(response, request);
+ }
+ } catch (apache::thrift::TException& e) {
+ std::stringstream ss;
+ ss << "close broker file failed, broker:" << _broker_addr << "
failed:" << e.what();
+ return Status::RpcError(ss.str());
+ }
+
+ if (response.statusCode != TBrokerOperationStatusCode::OK) {
+ std::stringstream ss;
+ ss << "close broker file failed, broker:" << _broker_addr << "
failed:" << response.message;
+ return Status::InternalError(ss.str());
+ }
return Status::OK();
}
@@ -480,5 +546,4 @@ std::string BrokerFileSystem::error_msg(const std::string&
err) const {
return fmt::format("({}:{}), {}", _broker_addr.hostname,
_broker_addr.port, err);
}
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/io/fs/broker_file_system.h
b/be/src/io/fs/broker_file_system.h
index 1e29b10a744..49a2ced67d9 100644
--- a/be/src/io/fs/broker_file_system.h
+++ b/be/src/io/fs/broker_file_system.h
@@ -19,6 +19,7 @@
#include <stdint.h>
+#include <cstddef>
#include <map>
#include <memory>
#include <string>
@@ -44,7 +45,9 @@ public:
~BrokerFileSystem() override = default;
- Status get_client(std::shared_ptr<BrokerServiceConnection>* client) const;
+ Status read_file(const TBrokerFD& fd, size_t offset, size_t bytes_req,
std::string* data) const;
+
+ Status close_file(const TBrokerFD& fd) const;
protected:
Status connect_impl() override;
@@ -80,7 +83,7 @@ private:
const TNetworkAddress& _broker_addr;
const std::map<std::string, std::string>& _broker_prop;
- std::shared_ptr<BrokerServiceConnection> _client;
+ std::unique_ptr<BrokerServiceConnection> _connection;
};
} // namespace io
} // namespace doris
diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h
index 7ba5cafabea..ff45055ca65 100644
--- a/be/src/runtime/client_cache.h
+++ b/be/src/runtime/client_cache.h
@@ -187,6 +187,8 @@ public:
Status reopen() { return _client_cache->reopen_client(&_client, 0); }
+ inline bool is_alive() { return _client != nullptr; }
+
T* operator->() const { return _client; }
private:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]