mapleFU commented on code in PR #2262:
URL: https://github.com/apache/kvrocks/pull/2262#discussion_r1590313270


##########
src/storage/redis_db.h:
##########
@@ -21,15 +21,46 @@
 #pragma once
 
 #include <map>
+#include <optional>
 #include <string>
 #include <utility>
+#include <variant>
 #include <vector>
 
 #include "redis_metadata.h"
+#include "server/redis_reply.h"
 #include "storage.h"
 
 namespace redis {
 
+struct SortArgument {
+  std::string sortby;                    // BY
+  bool dontsort = false;                 // DONT SORT
+  int offset = 0;                        // LIMIT OFFSET
+  int count = -1;                        // LIMIT COUNT
+  std::vector<std::string> getpatterns;  // GET
+  bool desc = false;                     // ASC/DESC
+  bool alpha = false;                    // ALPHA
+  std::string storekey;                  // STORE
+};
+
+struct RedisSortObject {
+  std::string obj;
+  std::variant<double, std::string> v;
+
+  /// SortCompare is a helper function that enables `RedisSortObject` to be 
sorted based on `SortArgument`.
+  ///
+  /// It can assist in implementing the third parameter `Compare comp` 
required by `std::sort`
+  ///
+  /// \param args The basis used to compare two RedisSortObjects.
+  /// If `args.alpha` is false, `RedisSortObject.v` will be taken as double 
for comparison
+  /// If `args.alpha` is true and `args.sortby` is not empty, 
`RedisSortObject.v` will be taken as string for comparison
+  /// If `args.alpha` is true and `args.sortby` is empty, the comparison is by 
`RedisSortObject.obj`.

Review Comment:
   So why do here has `std::variant` and a `std::string`? We should use only 
`variant` or `double + string` composite?



##########
src/storage/redis_db.cc:
##########
@@ -768,4 +773,197 @@ rocksdb::Status Database::Copy(const std::string &key, 
const std::string &new_ke
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::optional<std::string> Database::lookupKeyByPattern(const std::string 
&pattern, const std::string &subst) {
+  if (pattern == "#") {
+    return subst;
+  }
+
+  auto match_pos = pattern.find('*');
+  if (match_pos == std::string::npos) {
+    return std::nullopt;
+  }
+
+  // hash field
+  std::string field;
+  auto arrow_pos = pattern.find("->", match_pos + 1);
+  if (arrow_pos != std::string::npos && arrow_pos + 2 < pattern.size()) {
+    field = pattern.substr(arrow_pos + 2);
+  }
+
+  std::string key = pattern.substr(0, match_pos + 1);
+  key.replace(match_pos, 1, subst);
+
+  std::string value;
+  if (!field.empty()) {
+    auto hash_db = redis::Hash(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = hash_db.Type(key, &type); !s.ok() || type != 
RedisType::kRedisHash) {
+      return std::nullopt;
+    }
+
+    hash_db.Get(key, field, &value);
+  } else {
+    auto string_db = redis::String(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = string_db.Type(key, &type); !s.ok() || type != 
RedisType::kRedisString) {
+      return std::nullopt;
+    }
+    string_db.Get(key, &value);
+  }
+  return value;
+}
+
+rocksdb::Status Database::Sort(const RedisType &type, const std::string &key, 
const SortArgument &args,
+                               std::vector<std::optional<std::string>> *elems, 
SortResult *res) {
+  // Obtain the length of the object to sort.
+  const std::string ns_key = AppendNamespacePrefix(key);
+  Metadata metadata(type, false);
+  auto s = GetMetadata(GetOptions{}, {type}, ns_key, &metadata);
+  if (!s.ok()) {
+    return s;
+  }
+
+  int vectorlen = static_cast<int>(metadata.size);

Review Comment:
   Nit: we can limit the vectorlen. Currently kvrocks don't have memory limit 
logic, so a large length can easily crack the system? We can first limit the 
length to 512, and enlarge the size when we have better machanism



##########
src/storage/redis_db.h:
##########
@@ -119,6 +153,21 @@ class Database {
   // Already internal keys
   [[nodiscard]] rocksdb::Status existsInternal(const std::vector<std::string> 
&keys, int *ret);
   [[nodiscard]] rocksdb::Status typeInternal(const Slice &key, RedisType 
*type);
+
+  /// lookupKeyByPattern is a helper function of `Sort` to support `GET` and 
`BY` fields.
+  ///
+  /// \param pattern can be the value of a `BY` or `GET` field
+  /// \param subst is used to replace the "*" or "#" matched in the pattern 
string.
+  /// \return  Return the value associated to the key with a name obtained 
using the following rules:

Review Comment:
   ```suggestion
     /// \return  Returns the value associated to the key with a name obtained 
using the following rules:
   ```



##########
src/commands/cmd_key.cc:
##########
@@ -424,6 +424,115 @@ class CommandCopy : public Commander {
   bool replace_ = false;
 };
 
+template <bool ReadOnly>
+class CommandSort : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 2);
+    while (parser.Good()) {
+      if (parser.EatEqICase("BY")) {
+        sort_argument_.sortby = GET_OR_RET(parser.TakeStr());

Review Comment:
   Should we check `sortby.empty` here?



##########
src/storage/redis_db.cc:
##########
@@ -768,4 +773,197 @@ rocksdb::Status Database::Copy(const std::string &key, 
const std::string &new_ke
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::optional<std::string> Database::lookupKeyByPattern(const std::string 
&pattern, const std::string &subst) {
+  if (pattern == "#") {
+    return subst;
+  }
+
+  auto match_pos = pattern.find('*');
+  if (match_pos == std::string::npos) {
+    return std::nullopt;
+  }
+
+  // hash field
+  std::string field;
+  auto arrow_pos = pattern.find("->", match_pos + 1);
+  if (arrow_pos != std::string::npos && arrow_pos + 2 < pattern.size()) {
+    field = pattern.substr(arrow_pos + 2);
+  }
+
+  std::string key = pattern.substr(0, match_pos + 1);
+  key.replace(match_pos, 1, subst);
+
+  std::string value;
+  if (!field.empty()) {
+    auto hash_db = redis::Hash(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = hash_db.Type(key, &type); !s.ok() || type != 
RedisType::kRedisHash) {
+      return std::nullopt;
+    }
+
+    hash_db.Get(key, field, &value);
+  } else {
+    auto string_db = redis::String(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = string_db.Type(key, &type); !s.ok() || type != 
RedisType::kRedisString) {
+      return std::nullopt;
+    }
+    string_db.Get(key, &value);
+  }
+  return value;
+}
+
+rocksdb::Status Database::Sort(const RedisType &type, const std::string &key, 
const SortArgument &args,
+                               std::vector<std::optional<std::string>> *elems, 
SortResult *res) {
+  // Obtain the length of the object to sort.
+  const std::string ns_key = AppendNamespacePrefix(key);
+  Metadata metadata(type, false);
+  auto s = GetMetadata(GetOptions{}, {type}, ns_key, &metadata);
+  if (!s.ok()) {
+    return s;
+  }
+
+  int vectorlen = static_cast<int>(metadata.size);
+
+  // Adjust the offset and count of the limit
+  int offset = args.offset >= vectorlen ? 0 : std::clamp(args.offset, 0, 
vectorlen - 1);
+  int count = args.offset >= vectorlen ? 0 : std::clamp(args.count, -1, 
vectorlen - offset);
+  if (count == -1) count = vectorlen - offset;

Review Comment:
   `count == -1` is a bit weird, can we make it more strict to `count < 0`?



##########
src/storage/redis_db.cc:
##########
@@ -768,4 +773,197 @@ rocksdb::Status Database::Copy(const std::string &key, 
const std::string &new_ke
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::optional<std::string> Database::lookupKeyByPattern(const std::string 
&pattern, const std::string &subst) {
+  if (pattern == "#") {
+    return subst;
+  }
+
+  auto match_pos = pattern.find('*');
+  if (match_pos == std::string::npos) {
+    return std::nullopt;
+  }
+
+  // hash field
+  std::string field;
+  auto arrow_pos = pattern.find("->", match_pos + 1);
+  if (arrow_pos != std::string::npos && arrow_pos + 2 < pattern.size()) {
+    field = pattern.substr(arrow_pos + 2);
+  }
+
+  std::string key = pattern.substr(0, match_pos + 1);
+  key.replace(match_pos, 1, subst);
+
+  std::string value;
+  if (!field.empty()) {
+    auto hash_db = redis::Hash(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = hash_db.Type(key, &type); !s.ok() || type != 
RedisType::kRedisHash) {
+      return std::nullopt;
+    }
+
+    hash_db.Get(key, field, &value);
+  } else {
+    auto string_db = redis::String(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = string_db.Type(key, &type); !s.ok() || type != 
RedisType::kRedisString) {
+      return std::nullopt;
+    }
+    string_db.Get(key, &value);
+  }
+  return value;
+}
+
+rocksdb::Status Database::Sort(const RedisType &type, const std::string &key, 
const SortArgument &args,

Review Comment:
   `RedisType` better passing by value?



##########
src/commands/cmd_key.cc:
##########
@@ -424,6 +424,115 @@ class CommandCopy : public Commander {
   bool replace_ = false;
 };
 
+template <bool ReadOnly>
+class CommandSort : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 2);
+    while (parser.Good()) {
+      if (parser.EatEqICase("BY")) {
+        sort_argument_.sortby = GET_OR_RET(parser.TakeStr());
+
+        if (sort_argument_.sortby.find('*') == std::string::npos) {
+          sort_argument_.dontsort = true;
+        } else {
+          /* TODO:
+           * If BY is specified with a real pattern, we can't accept it in 
cluster mode,
+           * unless we can make sure the keys formed by the pattern are in the 
same slot
+           * as the key to sort.
+           * If BY is specified with a real pattern, we can't accept
+           * it if no full ACL key access is applied for this command. */
+        }
+      } else if (parser.EatEqICase("LIMIT")) {
+        sort_argument_.offset = GET_OR_RET(parser.template TakeInt<int>());
+        sort_argument_.count = GET_OR_RET(parser.template TakeInt<int>());
+      } else if (parser.EatEqICase("GET")) {
+        /* TODO:
+         * If GET is specified with a real pattern, we can't accept it in 
cluster mode,
+         * unless we can make sure the keys formed by the pattern are in the 
same slot
+         * as the key to sort. */
+        sort_argument_.getpatterns.push_back(GET_OR_RET(parser.TakeStr()));
+      } else if (parser.EatEqICase("ASC")) {
+        sort_argument_.desc = false;
+      } else if (parser.EatEqICase("DESC")) {
+        sort_argument_.desc = true;
+      } else if (parser.EatEqICase("ALPHA")) {
+        sort_argument_.alpha = true;
+      } else if (parser.EatEqICase("STORE")) {
+        if constexpr (ReadOnly) {
+          return {Status::RedisParseErr, "SORT_RO is read-only and does not 
support the STORE parameter"};
+        }
+        sort_argument_.storekey = GET_OR_RET(parser.TakeStr());
+      } else {
+        return parser.InvalidSyntax();
+      }
+    }
+
+    return Status::OK();
+  }
+
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    redis::Database redis(srv->storage, conn->GetNamespace());
+    RedisType type = kRedisNone;
+    if (auto s = redis.Type(args_[1], &type); !s.ok()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+
+    if (type != RedisType::kRedisList && type != RedisType::kRedisSet && type 
!= RedisType::kRedisZSet) {
+      *output = Error("WRONGTYPE Operation against a key holding the wrong 
kind of value");
+      return Status::OK();
+    }
+
+    /* When sorting a set with no sort specified, we must sort the output
+     * so the result is consistent across scripting and replication.
+     *
+     * The other types (list, sorted set) will retain their native order
+     * even if no sort order is requested, so they remain stable across
+     * scripting and replication.
+     *
+     * TODO: support CLIENT_SCRIPT flag, (!storekey_.empty() || c->flags & 
CLIENT_SCRIPT)) */

Review Comment:
   @git-hulk do we already have flag like this?



##########
src/storage/redis_db.cc:
##########
@@ -768,4 +773,197 @@ rocksdb::Status Database::Copy(const std::string &key, 
const std::string &new_ke
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::optional<std::string> Database::lookupKeyByPattern(const std::string 
&pattern, const std::string &subst) {
+  if (pattern == "#") {
+    return subst;
+  }
+
+  auto match_pos = pattern.find('*');
+  if (match_pos == std::string::npos) {
+    return std::nullopt;
+  }
+
+  // hash field
+  std::string field;
+  auto arrow_pos = pattern.find("->", match_pos + 1);
+  if (arrow_pos != std::string::npos && arrow_pos + 2 < pattern.size()) {
+    field = pattern.substr(arrow_pos + 2);
+  }
+
+  std::string key = pattern.substr(0, match_pos + 1);
+  key.replace(match_pos, 1, subst);
+
+  std::string value;
+  if (!field.empty()) {
+    auto hash_db = redis::Hash(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = hash_db.Type(key, &type); !s.ok() || type != 
RedisType::kRedisHash) {
+      return std::nullopt;
+    }
+
+    hash_db.Get(key, field, &value);
+  } else {
+    auto string_db = redis::String(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = string_db.Type(key, &type); !s.ok() || type != 
RedisType::kRedisString) {
+      return std::nullopt;
+    }
+    string_db.Get(key, &value);
+  }
+  return value;
+}
+
+rocksdb::Status Database::Sort(const RedisType &type, const std::string &key, 
const SortArgument &args,
+                               std::vector<std::optional<std::string>> *elems, 
SortResult *res) {
+  // Obtain the length of the object to sort.
+  const std::string ns_key = AppendNamespacePrefix(key);
+  Metadata metadata(type, false);
+  auto s = GetMetadata(GetOptions{}, {type}, ns_key, &metadata);
+  if (!s.ok()) {
+    return s;
+  }
+
+  int vectorlen = static_cast<int>(metadata.size);
+
+  // Adjust the offset and count of the limit
+  int offset = args.offset >= vectorlen ? 0 : std::clamp(args.offset, 0, 
vectorlen - 1);
+  int count = args.offset >= vectorlen ? 0 : std::clamp(args.count, -1, 
vectorlen - offset);
+  if (count == -1) count = vectorlen - offset;
+
+  // Get the elements that need to be sorted
+  std::vector<std::string> str_vec;
+  if (count != 0) {
+    if (type == RedisType::kRedisList) {
+      auto list_db = redis::List(storage_, namespace_);
+
+      if (args.dontsort) {
+        if (args.desc) {
+          list_db.Range(key, -count - offset, -1 - offset, &str_vec);
+          std::reverse(str_vec.begin(), str_vec.end());
+        } else {
+          list_db.Range(key, offset, offset + count - 1, &str_vec);
+        }
+      } else {
+        list_db.Range(key, 0, -1, &str_vec);
+      }
+    } else if (type == RedisType::kRedisSet) {
+      auto set_db = redis::Set(storage_, namespace_);
+      set_db.Members(key, &str_vec);
+
+      if (args.dontsort) {
+        str_vec = std::vector(std::make_move_iterator(str_vec.begin() + 
offset),
+                              std::make_move_iterator(str_vec.begin() + offset 
+ count));
+      }
+    } else if (type == RedisType::kRedisZSet) {
+      auto zset_db = redis::ZSet(storage_, namespace_);
+      std::vector<MemberScore> member_scores;
+
+      if (args.dontsort) {
+        RangeRankSpec spec;
+        spec.start = offset;
+        spec.stop = offset + count - 1;
+        spec.reversed = args.desc;
+        zset_db.RangeByRank(key, spec, &member_scores, nullptr);
+
+        for (auto &member_score : member_scores) {
+          str_vec.emplace_back(std::move(member_score.member));
+        }
+      } else {
+        zset_db.GetAllMemberScores(key, &member_scores);
+
+        for (auto &member_score : member_scores) {
+          str_vec.emplace_back(std::move(member_score.member));
+        }
+      }
+    } else {
+      *res = SortResult::UNKNOWN_TYPE;
+      return s;
+    }
+  }
+
+  std::vector<RedisSortObject> sort_vec(str_vec.size());
+  for (size_t i = 0; i < str_vec.size(); ++i) {
+    sort_vec[i].obj = str_vec[i];
+  }
+
+  // Sort by BY, ALPHA, ASC/DESC
+  if (!args.dontsort) {
+    for (size_t i = 0; i < sort_vec.size(); ++i) {
+      std::optional<std::string> byval;
+      if (!args.sortby.empty()) {
+        byval = lookupKeyByPattern(args.sortby, str_vec[i]);
+        if (!byval.has_value()) continue;
+      } else {
+        byval = str_vec[i];
+      }
+
+      if (args.alpha && !args.sortby.empty()) {
+        sort_vec[i].v = byval.value();
+      } else if (!args.alpha && !byval.value().empty()) {
+        auto double_byval = ParseFloat<double>(byval.value());
+        if (!double_byval) {
+          *res = SortResult::DOUBLE_CONVERT_ERROR;
+          return rocksdb::Status::OK();

Review Comment:
   I think it can be ok, but we can denote the rocksdb return value here



##########
src/storage/redis_db.cc:
##########
@@ -768,4 +773,197 @@ rocksdb::Status Database::Copy(const std::string &key, 
const std::string &new_ke
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::optional<std::string> Database::lookupKeyByPattern(const std::string 
&pattern, const std::string &subst) {
+  if (pattern == "#") {
+    return subst;
+  }
+
+  auto match_pos = pattern.find('*');
+  if (match_pos == std::string::npos) {
+    return std::nullopt;
+  }
+
+  // hash field
+  std::string field;
+  auto arrow_pos = pattern.find("->", match_pos + 1);
+  if (arrow_pos != std::string::npos && arrow_pos + 2 < pattern.size()) {
+    field = pattern.substr(arrow_pos + 2);
+  }
+
+  std::string key = pattern.substr(0, match_pos + 1);
+  key.replace(match_pos, 1, subst);
+
+  std::string value;
+  if (!field.empty()) {
+    auto hash_db = redis::Hash(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = hash_db.Type(key, &type); !s.ok() || type != 
RedisType::kRedisHash) {
+      return std::nullopt;
+    }
+
+    hash_db.Get(key, field, &value);
+  } else {
+    auto string_db = redis::String(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = string_db.Type(key, &type); !s.ok() || type != 
RedisType::kRedisString) {
+      return std::nullopt;
+    }
+    string_db.Get(key, &value);
+  }
+  return value;
+}
+
+rocksdb::Status Database::Sort(const RedisType &type, const std::string &key, 
const SortArgument &args,
+                               std::vector<std::optional<std::string>> *elems, 
SortResult *res) {
+  // Obtain the length of the object to sort.
+  const std::string ns_key = AppendNamespacePrefix(key);
+  Metadata metadata(type, false);
+  auto s = GetMetadata(GetOptions{}, {type}, ns_key, &metadata);
+  if (!s.ok()) {
+    return s;
+  }
+
+  int vectorlen = static_cast<int>(metadata.size);
+
+  // Adjust the offset and count of the limit
+  int offset = args.offset >= vectorlen ? 0 : std::clamp(args.offset, 0, 
vectorlen - 1);
+  int count = args.offset >= vectorlen ? 0 : std::clamp(args.count, -1, 
vectorlen - offset);
+  if (count == -1) count = vectorlen - offset;
+
+  // Get the elements that need to be sorted
+  std::vector<std::string> str_vec;
+  if (count != 0) {
+    if (type == RedisType::kRedisList) {
+      auto list_db = redis::List(storage_, namespace_);
+
+      if (args.dontsort) {
+        if (args.desc) {
+          list_db.Range(key, -count - offset, -1 - offset, &str_vec);
+          std::reverse(str_vec.begin(), str_vec.end());
+        } else {
+          list_db.Range(key, offset, offset + count - 1, &str_vec);
+        }
+      } else {
+        list_db.Range(key, 0, -1, &str_vec);
+      }
+    } else if (type == RedisType::kRedisSet) {
+      auto set_db = redis::Set(storage_, namespace_);
+      set_db.Members(key, &str_vec);
+
+      if (args.dontsort) {
+        str_vec = std::vector(std::make_move_iterator(str_vec.begin() + 
offset),
+                              std::make_move_iterator(str_vec.begin() + offset 
+ count));
+      }
+    } else if (type == RedisType::kRedisZSet) {
+      auto zset_db = redis::ZSet(storage_, namespace_);
+      std::vector<MemberScore> member_scores;
+
+      if (args.dontsort) {
+        RangeRankSpec spec;
+        spec.start = offset;
+        spec.stop = offset + count - 1;
+        spec.reversed = args.desc;
+        zset_db.RangeByRank(key, spec, &member_scores, nullptr);
+
+        for (auto &member_score : member_scores) {
+          str_vec.emplace_back(std::move(member_score.member));
+        }
+      } else {
+        zset_db.GetAllMemberScores(key, &member_scores);

Review Comment:
   Should we check the return value of these helper function? If get failed, we 
should return here rather than go to the other logic



##########
src/storage/redis_db.cc:
##########
@@ -768,4 +773,197 @@ rocksdb::Status Database::Copy(const std::string &key, 
const std::string &new_ke
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+std::optional<std::string> Database::lookupKeyByPattern(const std::string 
&pattern, const std::string &subst) {
+  if (pattern == "#") {
+    return subst;
+  }
+
+  auto match_pos = pattern.find('*');
+  if (match_pos == std::string::npos) {
+    return std::nullopt;
+  }
+
+  // hash field
+  std::string field;
+  auto arrow_pos = pattern.find("->", match_pos + 1);
+  if (arrow_pos != std::string::npos && arrow_pos + 2 < pattern.size()) {
+    field = pattern.substr(arrow_pos + 2);
+  }
+
+  std::string key = pattern.substr(0, match_pos + 1);
+  key.replace(match_pos, 1, subst);
+
+  std::string value;
+  if (!field.empty()) {
+    auto hash_db = redis::Hash(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = hash_db.Type(key, &type); !s.ok() || type != 
RedisType::kRedisHash) {
+      return std::nullopt;
+    }
+
+    hash_db.Get(key, field, &value);
+  } else {
+    auto string_db = redis::String(storage_, namespace_);
+    RedisType type = RedisType::kRedisNone;
+    if (auto s = string_db.Type(key, &type); !s.ok() || type != 
RedisType::kRedisString) {
+      return std::nullopt;
+    }
+    string_db.Get(key, &value);
+  }
+  return value;
+}
+
+rocksdb::Status Database::Sort(const RedisType &type, const std::string &key, 
const SortArgument &args,
+                               std::vector<std::optional<std::string>> *elems, 
SortResult *res) {
+  // Obtain the length of the object to sort.
+  const std::string ns_key = AppendNamespacePrefix(key);
+  Metadata metadata(type, false);
+  auto s = GetMetadata(GetOptions{}, {type}, ns_key, &metadata);
+  if (!s.ok()) {
+    return s;
+  }
+
+  int vectorlen = static_cast<int>(metadata.size);
+
+  // Adjust the offset and count of the limit
+  int offset = args.offset >= vectorlen ? 0 : std::clamp(args.offset, 0, 
vectorlen - 1);
+  int count = args.offset >= vectorlen ? 0 : std::clamp(args.count, -1, 
vectorlen - offset);
+  if (count == -1) count = vectorlen - offset;
+
+  // Get the elements that need to be sorted
+  std::vector<std::string> str_vec;
+  if (count != 0) {
+    if (type == RedisType::kRedisList) {
+      auto list_db = redis::List(storage_, namespace_);
+
+      if (args.dontsort) {
+        if (args.desc) {
+          list_db.Range(key, -count - offset, -1 - offset, &str_vec);
+          std::reverse(str_vec.begin(), str_vec.end());
+        } else {
+          list_db.Range(key, offset, offset + count - 1, &str_vec);
+        }
+      } else {
+        list_db.Range(key, 0, -1, &str_vec);
+      }
+    } else if (type == RedisType::kRedisSet) {
+      auto set_db = redis::Set(storage_, namespace_);
+      set_db.Members(key, &str_vec);
+
+      if (args.dontsort) {
+        str_vec = std::vector(std::make_move_iterator(str_vec.begin() + 
offset),
+                              std::make_move_iterator(str_vec.begin() + offset 
+ count));
+      }
+    } else if (type == RedisType::kRedisZSet) {
+      auto zset_db = redis::ZSet(storage_, namespace_);
+      std::vector<MemberScore> member_scores;
+
+      if (args.dontsort) {
+        RangeRankSpec spec;
+        spec.start = offset;
+        spec.stop = offset + count - 1;
+        spec.reversed = args.desc;
+        zset_db.RangeByRank(key, spec, &member_scores, nullptr);
+
+        for (auto &member_score : member_scores) {
+          str_vec.emplace_back(std::move(member_score.member));
+        }
+      } else {
+        zset_db.GetAllMemberScores(key, &member_scores);
+
+        for (auto &member_score : member_scores) {
+          str_vec.emplace_back(std::move(member_score.member));
+        }
+      }
+    } else {
+      *res = SortResult::UNKNOWN_TYPE;
+      return s;
+    }
+  }
+
+  std::vector<RedisSortObject> sort_vec(str_vec.size());
+  for (size_t i = 0; i < str_vec.size(); ++i) {
+    sort_vec[i].obj = str_vec[i];
+  }
+
+  // Sort by BY, ALPHA, ASC/DESC
+  if (!args.dontsort) {
+    for (size_t i = 0; i < sort_vec.size(); ++i) {
+      std::optional<std::string> byval;

Review Comment:
   Why it's a `std::optional` here?
   
   ```c++
   std::string byval;
         if (!args.sortby.empty()) {
           auto lookup = lookupKeyByPattern(args.sortby, str_vec[i]);
           if (!lookup.has_value()) continue;
           byval = std::move(lookup.value());
         } else {
           byval = str_vec[i];
         }
   ```
   
   Could this be better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to