empiredan commented on code in PR #1637:
URL:
https://github.com/apache/incubator-pegasus/pull/1637#discussion_r1357742127
##########
src/aio/file_io.cpp:
##########
@@ -26,32 +26,51 @@
#include "aio/file_io.h"
+#include <memory>
// IWYU pragma: no_include <algorithm>
#include <vector>
#include "aio/aio_provider.h"
#include "disk_engine.h"
+#include "rocksdb/env.h"
+#include "utils/fmt_logging.h"
namespace dsn {
class task_tracker;
namespace file {
-/*extern*/ disk_file *open(const char *file_name, int flag, int pmode)
+/*extern*/ disk_file *open(const std::string &fname, FileOpenType type)
{
- auto fd = disk_engine::provider().open(file_name, flag, pmode);
- if (fd.is_invalid()) {
- return nullptr;
+ switch (type) {
+ case FileOpenType::kReadOnly: {
+ auto sf = disk_engine::provider().open_read_file(fname);
+ if (!sf) {
+ return nullptr;
+ }
+ return new disk_file(std::move(sf));
}
-
- return new disk_file(fd);
+ case FileOpenType::kWriteOnly: {
+ auto wf = disk_engine::provider().open_write_file(fname);
+ if (!wf) {
+ return nullptr;
+ }
+ return new disk_file(std::move(wf));
+ }
+ default:
+ CHECK(false, "");
+ }
+ return nullptr;
}
/*extern*/ error_code close(disk_file *file)
{
- error_code result = ERR_INVALID_HANDLE;
+ error_code result = ERR_OK;
if (file != nullptr) {
- result = disk_engine::provider().close(file->native_handle());
+ // A read file is not needed to close.
Review Comment:
This means file would be closed when `delete file` ?
##########
src/aio/native_linux_aio_provider.cpp:
##########
@@ -26,107 +26,119 @@
#include "native_linux_aio_provider.h"
-#include <errno.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <memory>
-
#include "aio/aio_provider.h"
#include "aio/disk_engine.h"
+#include "rocksdb/env.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
#include "runtime/service_engine.h"
#include "runtime/task/async_calls.h"
+#include "utils/env.h"
#include "utils/fmt_logging.h"
#include "utils/latency_tracer.h"
#include "utils/ports.h"
-#include "utils/safe_strerror_posix.h"
namespace dsn {
native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk) :
aio_provider(disk) {}
native_linux_aio_provider::~native_linux_aio_provider() {}
-linux_fd_t native_linux_aio_provider::open(const char *file_name, int flag,
int pmode)
+std::unique_ptr<rocksdb::RandomAccessFile>
+native_linux_aio_provider::open_read_file(const std::string &fname)
{
- auto fd = ::open(file_name, flag, pmode);
- if (fd == DSN_INVALID_FILE_HANDLE) {
- LOG_ERROR("create file '{}' failed, err = {}", file_name,
utils::safe_strerror(errno));
+ std::unique_ptr<rocksdb::RandomAccessFile> rfile;
+ auto s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)
Review Comment:
means all files would be read or written as encrypted files ?
##########
src/aio/native_linux_aio_provider.cpp:
##########
@@ -26,107 +26,119 @@
#include "native_linux_aio_provider.h"
-#include <errno.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <memory>
-
#include "aio/aio_provider.h"
#include "aio/disk_engine.h"
+#include "rocksdb/env.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
#include "runtime/service_engine.h"
#include "runtime/task/async_calls.h"
+#include "utils/env.h"
#include "utils/fmt_logging.h"
#include "utils/latency_tracer.h"
#include "utils/ports.h"
-#include "utils/safe_strerror_posix.h"
namespace dsn {
native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk) :
aio_provider(disk) {}
native_linux_aio_provider::~native_linux_aio_provider() {}
-linux_fd_t native_linux_aio_provider::open(const char *file_name, int flag,
int pmode)
+std::unique_ptr<rocksdb::RandomAccessFile>
+native_linux_aio_provider::open_read_file(const std::string &fname)
{
- auto fd = ::open(file_name, flag, pmode);
- if (fd == DSN_INVALID_FILE_HANDLE) {
- LOG_ERROR("create file '{}' failed, err = {}", file_name,
utils::safe_strerror(errno));
+ std::unique_ptr<rocksdb::RandomAccessFile> rfile;
+ auto s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)
+ ->NewRandomAccessFile(fname, &rfile, rocksdb::EnvOptions());
+ if (!s.ok()) {
+ LOG_ERROR("open read file '{}' failed, err = {}", fname, s.ToString());
}
- return linux_fd_t(fd);
+ return rfile;
}
-error_code native_linux_aio_provider::close(linux_fd_t fd)
+std::unique_ptr<rocksdb::RandomRWFile>
+native_linux_aio_provider::open_write_file(const std::string &fname)
{
- if (fd.is_invalid() || ::close(fd.fd) == 0) {
- return ERR_OK;
+ // rocksdb::NewRandomRWFile() doesn't act as the docs described, it will
not create the
+ // file if it not exists, and an error Status will be returned, so we try
to create the
+ // file by ReopenWritableFile() if it not exist.
+ auto s =
dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)->FileExists(fname);
+ if (!s.ok() && !s.IsNotFound()) {
+ LOG_ERROR("failed to check whether the file '{}' exist, err = {}",
fname, s.ToString());
+ return nullptr;
+ }
+
+ if (s.IsNotFound()) {
+ std::unique_ptr<rocksdb::WritableFile> cfile;
+ s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)
+ ->ReopenWritableFile(fname, &cfile, rocksdb::EnvOptions());
+ if (!s.ok()) {
+ LOG_ERROR("failed to create file '{}', err = {}", fname,
s.ToString());
+ return nullptr;
+ }
}
- LOG_ERROR("close file failed, err = {}", utils::safe_strerror(errno));
- return ERR_FILE_OPERATION_FAILED;
+ // Open the file for write as RandomRWFile, to support un-sequential write.
+ std::unique_ptr<rocksdb::RandomRWFile> wfile;
+ s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)
+ ->NewRandomRWFile(fname, &wfile, rocksdb::EnvOptions());
+ if (!s.ok()) {
+ LOG_ERROR("open write file '{}' failed, err = {}", fname,
s.ToString());
+ }
+ return wfile;
}
-error_code native_linux_aio_provider::flush(linux_fd_t fd)
+error_code native_linux_aio_provider::close(rocksdb::RandomRWFile *wf)
{
- if (fd.is_invalid() || ::fsync(fd.fd) == 0) {
- return ERR_OK;
+ auto s = wf->Close();
+ if (!s.ok()) {
+ LOG_ERROR("close file failed, err = {}", s.ToString());
+ return ERR_FILE_OPERATION_FAILED;
}
- LOG_ERROR("flush file failed, err = {}", utils::safe_strerror(errno));
- return ERR_FILE_OPERATION_FAILED;
+ return ERR_OK;
+}
+
+error_code native_linux_aio_provider::flush(rocksdb::RandomRWFile *wf)
+{
+ auto s = wf->Fsync();
+ if (!s.ok()) {
+ LOG_ERROR("flush file failed, err = {}", s.ToString());
+ return ERR_FILE_OPERATION_FAILED;
+ }
+
+ return ERR_OK;
}
error_code native_linux_aio_provider::write(const aio_context &aio_ctx,
/*out*/ uint64_t *processed_bytes)
{
- dsn::error_code resp = ERR_OK;
- uint64_t buffer_offset = 0;
- do {
- // ret is the written data size
- auto ret = ::pwrite(aio_ctx.dfile->native_handle().fd,
- (char *)aio_ctx.buffer + buffer_offset,
- aio_ctx.buffer_size - buffer_offset,
- aio_ctx.file_offset + buffer_offset);
- if (dsn_unlikely(ret < 0)) {
- if (errno == EINTR) {
- LOG_WARNING("write failed with errno={} and will retry it.",
- utils::safe_strerror(errno));
- continue;
- }
- resp = ERR_FILE_OPERATION_FAILED;
- LOG_ERROR("write failed with errno={}, return {}.",
utils::safe_strerror(errno), resp);
- return resp;
- }
-
- buffer_offset += ret;
- if (dsn_unlikely(buffer_offset != aio_ctx.buffer_size)) {
- LOG_WARNING(
- "write incomplete, request_size={}, total_write_size={},
this_write_size={}, "
- "and will retry it.",
- aio_ctx.buffer_size,
- buffer_offset,
- ret);
- }
- } while (dsn_unlikely(buffer_offset < aio_ctx.buffer_size));
+ rocksdb::Slice data((const char *)(aio_ctx.buffer), aio_ctx.buffer_size);
+ auto s = aio_ctx.dfile->wfile()->Write(aio_ctx.file_offset, data);
+ if (!s.ok()) {
+ LOG_ERROR("write file failed, err = {}", s.ToString());
Review Comment:
Is it necessary to specify which file fails to be written ?
##########
src/aio/native_linux_aio_provider.cpp:
##########
@@ -26,107 +26,119 @@
#include "native_linux_aio_provider.h"
-#include <errno.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <memory>
-
#include "aio/aio_provider.h"
#include "aio/disk_engine.h"
+#include "rocksdb/env.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
#include "runtime/service_engine.h"
#include "runtime/task/async_calls.h"
+#include "utils/env.h"
#include "utils/fmt_logging.h"
#include "utils/latency_tracer.h"
#include "utils/ports.h"
-#include "utils/safe_strerror_posix.h"
namespace dsn {
native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk) :
aio_provider(disk) {}
native_linux_aio_provider::~native_linux_aio_provider() {}
-linux_fd_t native_linux_aio_provider::open(const char *file_name, int flag,
int pmode)
+std::unique_ptr<rocksdb::RandomAccessFile>
+native_linux_aio_provider::open_read_file(const std::string &fname)
{
- auto fd = ::open(file_name, flag, pmode);
- if (fd == DSN_INVALID_FILE_HANDLE) {
- LOG_ERROR("create file '{}' failed, err = {}", file_name,
utils::safe_strerror(errno));
+ std::unique_ptr<rocksdb::RandomAccessFile> rfile;
+ auto s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)
+ ->NewRandomAccessFile(fname, &rfile, rocksdb::EnvOptions());
+ if (!s.ok()) {
+ LOG_ERROR("open read file '{}' failed, err = {}", fname, s.ToString());
}
- return linux_fd_t(fd);
+ return rfile;
}
-error_code native_linux_aio_provider::close(linux_fd_t fd)
+std::unique_ptr<rocksdb::RandomRWFile>
+native_linux_aio_provider::open_write_file(const std::string &fname)
{
- if (fd.is_invalid() || ::close(fd.fd) == 0) {
- return ERR_OK;
+ // rocksdb::NewRandomRWFile() doesn't act as the docs described, it will
not create the
+ // file if it not exists, and an error Status will be returned, so we try
to create the
+ // file by ReopenWritableFile() if it not exist.
+ auto s =
dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)->FileExists(fname);
+ if (!s.ok() && !s.IsNotFound()) {
+ LOG_ERROR("failed to check whether the file '{}' exist, err = {}",
fname, s.ToString());
+ return nullptr;
+ }
+
+ if (s.IsNotFound()) {
+ std::unique_ptr<rocksdb::WritableFile> cfile;
+ s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)
+ ->ReopenWritableFile(fname, &cfile, rocksdb::EnvOptions());
+ if (!s.ok()) {
+ LOG_ERROR("failed to create file '{}', err = {}", fname,
s.ToString());
+ return nullptr;
+ }
}
- LOG_ERROR("close file failed, err = {}", utils::safe_strerror(errno));
- return ERR_FILE_OPERATION_FAILED;
+ // Open the file for write as RandomRWFile, to support un-sequential write.
+ std::unique_ptr<rocksdb::RandomRWFile> wfile;
+ s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)
+ ->NewRandomRWFile(fname, &wfile, rocksdb::EnvOptions());
+ if (!s.ok()) {
+ LOG_ERROR("open write file '{}' failed, err = {}", fname,
s.ToString());
+ }
+ return wfile;
}
-error_code native_linux_aio_provider::flush(linux_fd_t fd)
+error_code native_linux_aio_provider::close(rocksdb::RandomRWFile *wf)
{
- if (fd.is_invalid() || ::fsync(fd.fd) == 0) {
- return ERR_OK;
+ auto s = wf->Close();
+ if (!s.ok()) {
+ LOG_ERROR("close file failed, err = {}", s.ToString());
+ return ERR_FILE_OPERATION_FAILED;
}
- LOG_ERROR("flush file failed, err = {}", utils::safe_strerror(errno));
- return ERR_FILE_OPERATION_FAILED;
+ return ERR_OK;
+}
+
+error_code native_linux_aio_provider::flush(rocksdb::RandomRWFile *wf)
+{
+ auto s = wf->Fsync();
+ if (!s.ok()) {
+ LOG_ERROR("flush file failed, err = {}", s.ToString());
+ return ERR_FILE_OPERATION_FAILED;
+ }
+
+ return ERR_OK;
}
error_code native_linux_aio_provider::write(const aio_context &aio_ctx,
/*out*/ uint64_t *processed_bytes)
{
- dsn::error_code resp = ERR_OK;
- uint64_t buffer_offset = 0;
- do {
- // ret is the written data size
- auto ret = ::pwrite(aio_ctx.dfile->native_handle().fd,
- (char *)aio_ctx.buffer + buffer_offset,
- aio_ctx.buffer_size - buffer_offset,
- aio_ctx.file_offset + buffer_offset);
- if (dsn_unlikely(ret < 0)) {
- if (errno == EINTR) {
- LOG_WARNING("write failed with errno={} and will retry it.",
- utils::safe_strerror(errno));
- continue;
- }
- resp = ERR_FILE_OPERATION_FAILED;
- LOG_ERROR("write failed with errno={}, return {}.",
utils::safe_strerror(errno), resp);
- return resp;
- }
-
- buffer_offset += ret;
- if (dsn_unlikely(buffer_offset != aio_ctx.buffer_size)) {
- LOG_WARNING(
- "write incomplete, request_size={}, total_write_size={},
this_write_size={}, "
- "and will retry it.",
- aio_ctx.buffer_size,
- buffer_offset,
- ret);
- }
- } while (dsn_unlikely(buffer_offset < aio_ctx.buffer_size));
+ rocksdb::Slice data((const char *)(aio_ctx.buffer), aio_ctx.buffer_size);
+ auto s = aio_ctx.dfile->wfile()->Write(aio_ctx.file_offset, data);
+ if (!s.ok()) {
+ LOG_ERROR("write file failed, err = {}", s.ToString());
+ return ERR_FILE_OPERATION_FAILED;
+ }
- *processed_bytes = buffer_offset;
- return resp;
+ *processed_bytes = aio_ctx.buffer_size;
+ return ERR_OK;
}
error_code native_linux_aio_provider::read(const aio_context &aio_ctx,
/*out*/ uint64_t *processed_bytes)
{
- auto ret = ::pread(aio_ctx.dfile->native_handle().fd,
- aio_ctx.buffer,
- aio_ctx.buffer_size,
- aio_ctx.file_offset);
- if (dsn_unlikely(ret < 0)) {
- LOG_WARNING("write failed with errno={} and will retry it.",
utils::safe_strerror(errno));
+ rocksdb::Slice result;
+ auto s = aio_ctx.dfile->rfile()->Read(
+ aio_ctx.file_offset, aio_ctx.buffer_size, &result, (char
*)(aio_ctx.buffer));
+ if (!s.ok()) {
+ LOG_ERROR("read file failed, err = {}", s.ToString());
Review Comment:
Ditto.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]