This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new c7bf911a Rewrite *write function (#1045)
c7bf911a is described below
commit c7bf911a2fe515d963aa21d20ee46f8a8cb90d27
Author: xiaobiaozhao <[email protected]>
AuthorDate: Fri Oct 28 10:04:27 2022 +0800
Rewrite *write function (#1045)
Co-authored-by: Ruixiang Tan <[email protected]>
---
src/cluster/replication.cc | 2 +-
src/commands/redis_cmd.cc | 5 ++---
src/common/util.cc | 24 ++++++++++++++++++++++++
src/common/util.h | 4 +++-
src/main.cc | 2 +-
src/server/worker.cc | 4 ++--
tools/kvrocks2redis/main.cc | 2 +-
tools/kvrocks2redis/redis_writer.cc | 2 +-
tools/kvrocks2redis/sync.cc | 2 +-
tools/kvrocks2redis/writer.cc | 6 ++++--
10 files changed, 40 insertions(+), 13 deletions(-)
diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 31bd5958..5e20aa9a 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -51,7 +51,7 @@ Status FeedSlaveThread::Start() {
sigaddset(&mask, SIGHUP);
sigaddset(&mask, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &mask, &omask);
- write(conn_->GetFD(), "+OK\r\n", 5);
+ Util::SockSend(conn_->GetFD(), "+OK\r\n");
this->loop();
});
} catch (const std::system_error &e) {
diff --git a/src/commands/redis_cmd.cc b/src/commands/redis_cmd.cc
index 37e90041..903ca8fa 100644
--- a/src/commands/redis_cmd.cc
+++ b/src/commands/redis_cmd.cc
@@ -3954,7 +3954,7 @@ class CommandPSync : public Commander {
Status s = svr->AddSlave(conn, next_repl_seq);
if (!s.IsOK()) {
std::string err = "-ERR " + s.Msg() + "\r\n";
- write(conn->GetFD(), err.c_str(), err.length());
+ Util::SockSend(conn->GetFD(), err);
conn->EnableFlag(Redis::Connection::kCloseAsync);
LOG(WARNING) << "Failed to add salve: " << conn->GetAddr() << " to start
increment syncing";
} else {
@@ -4638,8 +4638,7 @@ class CommandFetchMeta : public Commander {
std::string files;
auto s =
Engine::Storage::ReplDataManager::GetFullReplDataInfo(svr->storage_, &files);
if (!s.IsOK()) {
- const char *message = "-ERR can't create db checkpoint";
- write(repl_fd, message, strlen(message));
+ Util::SockSend(repl_fd, "-ERR can't create db checkpoint");
LOG(WARNING) << "[replication] Failed to get full data file info,"
<< " error: " << s.Msg();
return;
diff --git a/src/common/util.cc b/src/common/util.cc
index 2be0c10c..29a9009a 100644
--- a/src/common/util.cc
+++ b/src/common/util.cc
@@ -672,4 +672,28 @@ int aeWait(int fd, int mask, uint64_t timeout) {
return retval;
}
}
+
+Status Write(int fd, const std::string &data) {
+ ssize_t n = 0;
+ while (n < static_cast<ssize_t>(data.size())) {
+ ssize_t nwritten = write(fd, data.c_str() + n, data.size() - n);
+ if (nwritten == -1) {
+ return Status(Status::NotOK, strerror(errno));
+ }
+ n += nwritten;
+ }
+ return Status::OK();
+}
+
+Status Pwrite(int fd, const std::string &data, off_t offset) {
+ ssize_t n = 0;
+ while (n < static_cast<ssize_t>(data.size())) {
+ ssize_t nwritten = pwrite(fd, data.c_str() + n, data.size() - n, offset);
+ if (nwritten == -1) {
+ return Status(Status::NotOK, strerror(errno));
+ }
+ n += nwritten;
+ }
+ return Status::OK();
+}
} // namespace Util
diff --git a/src/common/util.h b/src/common/util.h
index 749ac0bc..5fcb8a7d 100644
--- a/src/common/util.h
+++ b/src/common/util.h
@@ -75,5 +75,7 @@ auto GetTimeStamp() {
}
inline uint64_t GetTimeStampMS() { return
GetTimeStamp<std::chrono::milliseconds>(); }
inline uint64_t GetTimeStampUS() { return
GetTimeStamp<std::chrono::microseconds>(); }
-
+// file util
+Status Write(int fd, const std::string &data);
+Status Pwrite(int fd, const std::string &data, off_t offset);
} // namespace Util
diff --git a/src/main.cc b/src/main.cc
index 9fa195c8..22a3fb6e 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -250,7 +250,7 @@ static Status createPidFile(const std::string &path) {
return Status(Status::NotOK, strerror(errno));
}
std::string pid_str = std::to_string(getpid());
- write(*fd, pid_str.data(), pid_str.size());
+ Util::Write(*fd, pid_str);
return Status::OK();
}
diff --git a/src/server/worker.cc b/src/server/worker.cc
index 50fc1325..4b64d4f6 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -157,7 +157,7 @@ void Worker::newTCPConnection(evconnlistener *listener,
evutil_socket_t fd, sock
Status status = worker->AddConnection(conn);
if (!status.IsOK()) {
std::string err_msg = Redis::Error("ERR " + status.Msg());
- write(fd, err_msg.data(), err_msg.size());
+ Util::SockSend(fd, err_msg);
conn->Close();
return;
}
@@ -185,7 +185,7 @@ void Worker::newUnixSocketConnection(evconnlistener
*listener, evutil_socket_t f
Status status = worker->AddConnection(conn);
if (!status.IsOK()) {
std::string err_msg = Redis::Error("ERR " + status.Msg());
- write(fd, err_msg.data(), err_msg.size());
+ Util::SockSend(fd, err_msg);
conn->Close();
return;
}
diff --git a/tools/kvrocks2redis/main.cc b/tools/kvrocks2redis/main.cc
index 79a7101c..b9d57a37 100644
--- a/tools/kvrocks2redis/main.cc
+++ b/tools/kvrocks2redis/main.cc
@@ -87,7 +87,7 @@ static Status createPidFile(const std::string &path) {
return Status(Status::NotOK, strerror(errno));
}
std::string pid_str = std::to_string(getpid());
- write(fd, pid_str.data(), pid_str.size());
+ Util::Write(fd, pid_str);
close(fd);
return Status::OK();
}
diff --git a/tools/kvrocks2redis/redis_writer.cc
b/tools/kvrocks2redis/redis_writer.cc
index 48a5ad19..ed23eb68 100644
--- a/tools/kvrocks2redis/redis_writer.cc
+++ b/tools/kvrocks2redis/redis_writer.cc
@@ -234,7 +234,7 @@ Status RedisWriter::writeNextOffsetToFile(const std::string
&ns, std::istream::o
offset_string += " ";
}
offset_string += '\0';
- pwrite(next_offset_fds_[ns], offset_string.data(), offset_string.size(), 0);
+ Util::Pwrite(next_offset_fds_[ns], offset_string, 0);
return Status::OK();
}
diff --git a/tools/kvrocks2redis/sync.cc b/tools/kvrocks2redis/sync.cc
index 238ee1e6..1c65f8d8 100644
--- a/tools/kvrocks2redis/sync.cc
+++ b/tools/kvrocks2redis/sync.cc
@@ -251,6 +251,6 @@ Status Sync::writeNextSeqToFile(rocksdb::SequenceNumber
seq) {
seq_string += " ";
}
seq_string += '\0';
- pwrite(next_seq_fd_, seq_string.data(), seq_string.size(), 0);
+ Util::Pwrite(next_seq_fd_, seq_string, 0);
return Status::OK();
}
diff --git a/tools/kvrocks2redis/writer.cc b/tools/kvrocks2redis/writer.cc
index 29756ac2..d7680cdb 100644
--- a/tools/kvrocks2redis/writer.cc
+++ b/tools/kvrocks2redis/writer.cc
@@ -25,6 +25,8 @@
#include <cstring>
+#include "util.h"
+
Writer::~Writer() {
for (const auto &iter : aof_fds_) {
close(iter.second);
@@ -36,8 +38,8 @@ Status Writer::Write(const std::string &ns, const
std::vector<std::string> &aofs
if (!s.IsOK()) {
return Status(Status::NotOK, s.Msg());
}
- for (size_t i = 0; i < aofs.size(); i++) {
- write(aof_fds_[ns], aofs[i].data(), aofs[i].size());
+ for (const auto &aof : aofs) {
+ Util::Write(aof_fds_[ns], aof);
}
return Status::OK();