This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 65a939cd574 [fix](file-writer) avoid empty file for segment writer
(#31355)
65a939cd574 is described below
commit 65a939cd5746cd18202f2fea80fcae7580b66bdc
Author: Mingyu Chen <[email protected]>
AuthorDate: Sun Feb 25 21:39:48 2024 +0800
[fix](file-writer) avoid empty file for segment writer (#31355)
bp #31169
---
be/src/io/fs/broker_file_system.cpp | 2 +-
be/src/io/fs/broker_file_writer.cpp | 9 ++++++---
be/src/io/fs/broker_file_writer.h | 2 +-
be/src/io/fs/file_writer.h | 1 +
be/src/io/fs/file_writer_options.h | 2 ++
be/src/io/fs/hdfs_file_system.cpp | 2 +-
be/src/io/fs/hdfs_file_writer.cpp | 6 ++++--
be/src/io/fs/hdfs_file_writer.h | 2 +-
be/src/io/fs/s3_file_system.cpp | 2 +-
be/src/io/fs/s3_file_writer.cpp | 13 ++++++++-----
be/src/io/fs/s3_file_writer.h | 2 +-
be/src/olap/rowset/beta_rowset_writer.cpp | 3 ++-
.../rowset/segment_v2/inverted_index_compound_directory.cpp | 3 ++-
be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 3 ++-
be/src/olap/tablet.cpp | 3 ++-
15 files changed, 35 insertions(+), 20 deletions(-)
diff --git a/be/src/io/fs/broker_file_system.cpp
b/be/src/io/fs/broker_file_system.cpp
index a3c93c04a7a..f0d6bc12380 100644
--- a/be/src/io/fs/broker_file_system.cpp
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -97,7 +97,7 @@ Status BrokerFileSystem::connect_impl() {
Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr*
writer,
const FileWriterOptions* opts) {
*writer = std::make_unique<BrokerFileWriter>(ExecEnv::GetInstance(),
_broker_addr, _broker_prop,
- path, 0 /* offset */,
getSPtr());
+ path, 0 /* offset */,
getSPtr(), opts);
return Status::OK();
}
diff --git a/be/src/io/fs/broker_file_writer.cpp
b/be/src/io/fs/broker_file_writer.cpp
index daba1af2bce..a46d22e1505 100644
--- a/be/src/io/fs/broker_file_writer.cpp
+++ b/be/src/io/fs/broker_file_writer.cpp
@@ -37,12 +37,15 @@ namespace io {
BrokerFileWriter::BrokerFileWriter(ExecEnv* env, const TNetworkAddress&
broker_address,
const std::map<std::string, std::string>&
properties,
- const std::string& path, int64_t
start_offset, FileSystemSPtr fs)
+ const std::string& path, int64_t
start_offset, FileSystemSPtr fs,
+ const FileWriterOptions* opts)
: FileWriter(path, fs),
_env(env),
_address(broker_address),
_properties(properties),
- _cur_offset(start_offset) {}
+ _cur_offset(start_offset) {
+ _create_empty_file = opts ? opts->create_empty_file : true;
+}
BrokerFileWriter::~BrokerFileWriter() {
if (_opened) {
@@ -159,7 +162,7 @@ Status BrokerFileWriter::finalize() {
}
Status BrokerFileWriter::open() {
- if (!_opened) {
+ if (_create_empty_file && !_opened) {
RETURN_IF_ERROR(_open());
_opened = true;
}
diff --git a/be/src/io/fs/broker_file_writer.h
b/be/src/io/fs/broker_file_writer.h
index e3e53525679..3e8edab0078 100644
--- a/be/src/io/fs/broker_file_writer.h
+++ b/be/src/io/fs/broker_file_writer.h
@@ -42,7 +42,7 @@ class BrokerFileWriter : public FileWriter {
public:
BrokerFileWriter(ExecEnv* env, const TNetworkAddress& broker_address,
const std::map<std::string, std::string>& properties,
const std::string& path,
- int64_t start_offset, FileSystemSPtr fs);
+ int64_t start_offset, FileSystemSPtr fs, const
FileWriterOptions* opts);
virtual ~BrokerFileWriter();
Status open() override;
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index 03f092c0424..1fd9b8391d9 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -67,6 +67,7 @@ protected:
FileSystemSPtr _fs;
bool _closed = false;
bool _opened = false;
+ bool _create_empty_file = true;
};
} // namespace io
diff --git a/be/src/io/fs/file_writer_options.h
b/be/src/io/fs/file_writer_options.h
index 511bd81a168..4af38092373 100644
--- a/be/src/io/fs/file_writer_options.h
+++ b/be/src/io/fs/file_writer_options.h
@@ -26,6 +26,8 @@ struct FileWriterOptions {
bool is_cold_data = false;
bool sync_file_data = true; // Whether flush data into storage
system
int64_t file_cache_expiration = 0; // Absolute time
+ // Whether to create empty file if no content
+ bool create_empty_file = true;
};
} // namespace io
diff --git a/be/src/io/fs/hdfs_file_system.cpp
b/be/src/io/fs/hdfs_file_system.cpp
index 6b44b219128..d3d54527836 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -167,7 +167,7 @@ Status HdfsFileSystem::connect_impl() {
Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr*
writer,
const FileWriterOptions* opts) {
- *writer = std::make_unique<HdfsFileWriter>(file, getSPtr());
+ *writer = std::make_unique<HdfsFileWriter>(file, getSPtr(), opts);
return Status::OK();
}
diff --git a/be/src/io/fs/hdfs_file_writer.cpp
b/be/src/io/fs/hdfs_file_writer.cpp
index a5f5dab9fd4..fe4b6cde199 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -34,7 +34,9 @@
namespace doris {
namespace io {
-HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs) :
FileWriter(std::move(file), fs) {
+HdfsFileWriter::HdfsFileWriter(Path file, FileSystemSPtr fs, const
FileWriterOptions* opts)
+ : FileWriter(std::move(file), fs) {
+ _create_empty_file = opts ? opts->create_empty_file : true;
_hdfs_fs = (HdfsFileSystem*)_fs.get();
}
@@ -109,7 +111,7 @@ Status HdfsFileWriter::finalize() {
}
Status HdfsFileWriter::open() {
- if (!_opened) {
+ if (_create_empty_file && !_opened) {
RETURN_IF_ERROR(_open());
_opened = true;
}
diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h
index 812598e7a51..e62c6d6367e 100644
--- a/be/src/io/fs/hdfs_file_writer.h
+++ b/be/src/io/fs/hdfs_file_writer.h
@@ -33,7 +33,7 @@ class HdfsFileSystem;
class HdfsFileWriter : public FileWriter {
public:
- HdfsFileWriter(Path file, FileSystemSPtr fs);
+ HdfsFileWriter(Path file, FileSystemSPtr fs, const FileWriterOptions*
opts);
~HdfsFileWriter();
Status open() override;
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index a92539713e2..cad49b4555c 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -131,7 +131,7 @@ Status S3FileSystem::connect_impl() {
Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) {
GET_KEY(key, file);
- *writer = std::make_unique<S3FileWriter>(key, get_client(), _s3_conf,
getSPtr());
+ *writer = std::make_unique<S3FileWriter>(key, get_client(), _s3_conf,
getSPtr(), opts);
return Status::OK();
}
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 78c8f9355c9..aa7dcb573ea 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -78,7 +78,7 @@ bvar::Adder<uint64_t> s3_file_created_total("s3_file_writer",
"file_created");
bvar::Adder<uint64_t> s3_file_being_written("s3_file_writer",
"file_being_written");
S3FileWriter::S3FileWriter(Path path, std::shared_ptr<S3Client> client, const
S3Conf& s3_conf,
- FileSystemSPtr fs)
+ FileSystemSPtr fs, const FileWriterOptions* opts)
: FileWriter(Path(s3_conf.endpoint) / s3_conf.bucket / path,
std::move(fs)),
_bucket(s3_conf.bucket),
_key(std::move(path)),
@@ -87,6 +87,7 @@ S3FileWriter::S3FileWriter(Path path,
std::shared_ptr<S3Client> client, const S3
s3_file_writer_total << 1;
s3_file_being_written << 1;
+ _create_empty_file = opts ? opts->create_empty_file : true;
Aws::Http::SetCompliantRfc3986Encoding(true);
}
@@ -195,7 +196,7 @@ Status S3FileWriter::close() {
// it might be one file less than 5MB, we do upload here
_pending_buf->set_upload_remote_callback(
[this, buf = _pending_buf]() { _put_object(*buf); });
- } else {
+ } else if (_create_empty_file) {
// if there is no pending buffer, we need to create an empty file
_pending_buf = S3FileBufferPool::GetInstance()->allocate();
// if there is no upload id, we need to create a new one
@@ -211,9 +212,11 @@ Status S3FileWriter::close() {
});
}
}
- _countdown_event.add_count();
- _pending_buf->submit();
- _pending_buf = nullptr;
+ if (_pending_buf != nullptr) {
+ _countdown_event.add_count();
+ _pending_buf->submit();
+ _pending_buf = nullptr;
+ }
RETURN_IF_ERROR(_complete());
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index 2c139242ed4..ab4c1f9f47c 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -46,7 +46,7 @@ struct S3FileBuffer;
class S3FileWriter final : public FileWriter {
public:
S3FileWriter(Path path, std::shared_ptr<Aws::S3::S3Client> client, const
S3Conf& s3_conf,
- FileSystemSPtr fs);
+ FileSystemSPtr fs, const FileWriterOptions* opts);
~S3FileWriter() override;
Status close() override;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index c90cd6ba079..2b47b3aaed8 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -706,7 +706,8 @@ Status BetaRowsetWriter::_do_create_segment_writer(
return Status::Error<INIT_FAILED>("get fs failed");
}
io::FileWriterPtr file_writer;
- Status st = fs->create_file(path, &file_writer);
+ io::FileWriterOptions opts {.create_empty_file = false};
+ Status st = fs->create_file(path, &file_writer, &opts);
if (!st.ok()) {
LOG(WARNING) << "failed to create writable file. path=" << path << ",
err: " << st;
return st;
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
index 1af26a57674..e7f8f6abe26 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
@@ -596,7 +596,8 @@ void DorisCompoundDirectory::touchFile(const char* name) {
snprintf(buffer, CL_MAX_DIR, "%s%s%s", directory.c_str(), PATH_DELIMITERA,
name);
io::FileWriterPtr tmp_writer;
- LOG_AND_THROW_IF_ERROR(fs->create_file(buffer, &tmp_writer), "Touch file
IO error")
+ io::FileWriterOptions opts {.create_empty_file = false};
+ LOG_AND_THROW_IF_ERROR(fs->create_file(buffer, &tmp_writer, &opts), "Touch
file IO error")
}
int64_t DorisCompoundDirectory::fileLength(const char* name) const {
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index 10cef8e1675..33d638b63f3 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -182,7 +182,8 @@ Status VerticalBetaRowsetWriter::_create_segment_writer(
return Status::Error<INIT_FAILED>("get fs failed");
}
io::FileWriterPtr file_writer;
- Status st = fs->create_file(path, &file_writer);
+ io::FileWriterOptions opts {.create_empty_file = false};
+ Status st = fs->create_file(path, &file_writer, &opts);
if (!st.ok()) {
LOG(WARNING) << "failed to create writable file. path=" << path << ",
err: " << st;
return st;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 61fe618f032..47e089add41 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2267,8 +2267,9 @@ Status Tablet::write_cooldown_meta() {
std::string remote_meta_path =
remote_tablet_meta_path(tablet_id(), _cooldown_replica_id,
_cooldown_term);
io::FileWriterPtr tablet_meta_writer;
+ io::FileWriterOptions opts {.create_empty_file = false};
// FIXME(plat1ko): What if object store permanently unavailable?
- RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer));
+ RETURN_IF_ERROR(fs->create_file(remote_meta_path, &tablet_meta_writer,
&opts));
auto val = tablet_meta_pb.SerializeAsString();
RETURN_IF_ERROR(tablet_meta_writer->append({val.data(), val.size()}));
return tablet_meta_writer->close();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]