This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new c7bf911a Rewrite *write function (#1045)
c7bf911a is described below

commit c7bf911a2fe515d963aa21d20ee46f8a8cb90d27
Author: xiaobiaozhao <[email protected]>
AuthorDate: Fri Oct 28 10:04:27 2022 +0800

    Rewrite *write function (#1045)
    
    Co-authored-by: Ruixiang Tan <[email protected]>
---
 src/cluster/replication.cc          |  2 +-
 src/commands/redis_cmd.cc           |  5 ++---
 src/common/util.cc                  | 24 ++++++++++++++++++++++++
 src/common/util.h                   |  4 +++-
 src/main.cc                         |  2 +-
 src/server/worker.cc                |  4 ++--
 tools/kvrocks2redis/main.cc         |  2 +-
 tools/kvrocks2redis/redis_writer.cc |  2 +-
 tools/kvrocks2redis/sync.cc         |  2 +-
 tools/kvrocks2redis/writer.cc       |  6 ++++--
 10 files changed, 40 insertions(+), 13 deletions(-)

diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc
index 31bd5958..5e20aa9a 100644
--- a/src/cluster/replication.cc
+++ b/src/cluster/replication.cc
@@ -51,7 +51,7 @@ Status FeedSlaveThread::Start() {
       sigaddset(&mask, SIGHUP);
       sigaddset(&mask, SIGPIPE);
       pthread_sigmask(SIG_BLOCK, &mask, &omask);
-      write(conn_->GetFD(), "+OK\r\n", 5);
+      Util::SockSend(conn_->GetFD(), "+OK\r\n");
       this->loop();
     });
   } catch (const std::system_error &e) {
diff --git a/src/commands/redis_cmd.cc b/src/commands/redis_cmd.cc
index 37e90041..903ca8fa 100644
--- a/src/commands/redis_cmd.cc
+++ b/src/commands/redis_cmd.cc
@@ -3954,7 +3954,7 @@ class CommandPSync : public Commander {
     Status s = svr->AddSlave(conn, next_repl_seq);
     if (!s.IsOK()) {
       std::string err = "-ERR " + s.Msg() + "\r\n";
-      write(conn->GetFD(), err.c_str(), err.length());
+      Util::SockSend(conn->GetFD(), err);
       conn->EnableFlag(Redis::Connection::kCloseAsync);
       LOG(WARNING) << "Failed to add salve: " << conn->GetAddr() << " to start 
increment syncing";
     } else {
@@ -4638,8 +4638,7 @@ class CommandFetchMeta : public Commander {
       std::string files;
       auto s = 
Engine::Storage::ReplDataManager::GetFullReplDataInfo(svr->storage_, &files);
       if (!s.IsOK()) {
-        const char *message = "-ERR can't create db checkpoint";
-        write(repl_fd, message, strlen(message));
+        Util::SockSend(repl_fd, "-ERR can't create db checkpoint");
         LOG(WARNING) << "[replication] Failed to get full data file info,"
                      << " error: " << s.Msg();
         return;
diff --git a/src/common/util.cc b/src/common/util.cc
index 2be0c10c..29a9009a 100644
--- a/src/common/util.cc
+++ b/src/common/util.cc
@@ -672,4 +672,28 @@ int aeWait(int fd, int mask, uint64_t timeout) {
     return retval;
   }
 }
+
+Status Write(int fd, const std::string &data) {
+  ssize_t n = 0;
+  while (n < static_cast<ssize_t>(data.size())) {
+    ssize_t nwritten = write(fd, data.c_str() + n, data.size() - n);
+    if (nwritten == -1) {
+      return Status(Status::NotOK, strerror(errno));
+    }
+    n += nwritten;
+  }
+  return Status::OK();
+}
+
+Status Pwrite(int fd, const std::string &data, off_t offset) {
+  ssize_t n = 0;
+  while (n < static_cast<ssize_t>(data.size())) {
+    ssize_t nwritten = pwrite(fd, data.c_str() + n, data.size() - n, offset);
+    if (nwritten == -1) {
+      return Status(Status::NotOK, strerror(errno));
+    }
+    n += nwritten;
+  }
+  return Status::OK();
+}
 }  // namespace Util
diff --git a/src/common/util.h b/src/common/util.h
index 749ac0bc..5fcb8a7d 100644
--- a/src/common/util.h
+++ b/src/common/util.h
@@ -75,5 +75,7 @@ auto GetTimeStamp() {
 }
 inline uint64_t GetTimeStampMS() { return 
GetTimeStamp<std::chrono::milliseconds>(); }
 inline uint64_t GetTimeStampUS() { return 
GetTimeStamp<std::chrono::microseconds>(); }
-
+// file util
+Status Write(int fd, const std::string &data);
+Status Pwrite(int fd, const std::string &data, off_t offset);
 }  // namespace Util
diff --git a/src/main.cc b/src/main.cc
index 9fa195c8..22a3fb6e 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -250,7 +250,7 @@ static Status createPidFile(const std::string &path) {
     return Status(Status::NotOK, strerror(errno));
   }
   std::string pid_str = std::to_string(getpid());
-  write(*fd, pid_str.data(), pid_str.size());
+  Util::Write(*fd, pid_str);
   return Status::OK();
 }
 
diff --git a/src/server/worker.cc b/src/server/worker.cc
index 50fc1325..4b64d4f6 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -157,7 +157,7 @@ void Worker::newTCPConnection(evconnlistener *listener, 
evutil_socket_t fd, sock
   Status status = worker->AddConnection(conn);
   if (!status.IsOK()) {
     std::string err_msg = Redis::Error("ERR " + status.Msg());
-    write(fd, err_msg.data(), err_msg.size());
+    Util::SockSend(fd, err_msg);
     conn->Close();
     return;
   }
@@ -185,7 +185,7 @@ void Worker::newUnixSocketConnection(evconnlistener 
*listener, evutil_socket_t f
   Status status = worker->AddConnection(conn);
   if (!status.IsOK()) {
     std::string err_msg = Redis::Error("ERR " + status.Msg());
-    write(fd, err_msg.data(), err_msg.size());
+    Util::SockSend(fd, err_msg);
     conn->Close();
     return;
   }
diff --git a/tools/kvrocks2redis/main.cc b/tools/kvrocks2redis/main.cc
index 79a7101c..b9d57a37 100644
--- a/tools/kvrocks2redis/main.cc
+++ b/tools/kvrocks2redis/main.cc
@@ -87,7 +87,7 @@ static Status createPidFile(const std::string &path) {
     return Status(Status::NotOK, strerror(errno));
   }
   std::string pid_str = std::to_string(getpid());
-  write(fd, pid_str.data(), pid_str.size());
+  Util::Write(fd, pid_str);
   close(fd);
   return Status::OK();
 }
diff --git a/tools/kvrocks2redis/redis_writer.cc 
b/tools/kvrocks2redis/redis_writer.cc
index 48a5ad19..ed23eb68 100644
--- a/tools/kvrocks2redis/redis_writer.cc
+++ b/tools/kvrocks2redis/redis_writer.cc
@@ -234,7 +234,7 @@ Status RedisWriter::writeNextOffsetToFile(const std::string 
&ns, std::istream::o
     offset_string += " ";
   }
   offset_string += '\0';
-  pwrite(next_offset_fds_[ns], offset_string.data(), offset_string.size(), 0);
+  Util::Pwrite(next_offset_fds_[ns], offset_string, 0);
   return Status::OK();
 }
 
diff --git a/tools/kvrocks2redis/sync.cc b/tools/kvrocks2redis/sync.cc
index 238ee1e6..1c65f8d8 100644
--- a/tools/kvrocks2redis/sync.cc
+++ b/tools/kvrocks2redis/sync.cc
@@ -251,6 +251,6 @@ Status Sync::writeNextSeqToFile(rocksdb::SequenceNumber 
seq) {
     seq_string += " ";
   }
   seq_string += '\0';
-  pwrite(next_seq_fd_, seq_string.data(), seq_string.size(), 0);
+  Util::Pwrite(next_seq_fd_, seq_string, 0);
   return Status::OK();
 }
diff --git a/tools/kvrocks2redis/writer.cc b/tools/kvrocks2redis/writer.cc
index 29756ac2..d7680cdb 100644
--- a/tools/kvrocks2redis/writer.cc
+++ b/tools/kvrocks2redis/writer.cc
@@ -25,6 +25,8 @@
 
 #include <cstring>
 
+#include "util.h"
+
 Writer::~Writer() {
   for (const auto &iter : aof_fds_) {
     close(iter.second);
@@ -36,8 +38,8 @@ Status Writer::Write(const std::string &ns, const 
std::vector<std::string> &aofs
   if (!s.IsOK()) {
     return Status(Status::NotOK, s.Msg());
   }
-  for (size_t i = 0; i < aofs.size(); i++) {
-    write(aof_fds_[ns], aofs[i].data(), aofs[i].size());
+  for (const auto &aof : aofs) {
+    Util::Write(aof_fds_[ns], aof);
   }
 
   return Status::OK();

Reply via email to