julic20s commented on code in PR #1901:
URL: https://github.com/apache/kvrocks/pull/1901#discussion_r1401423417
##########
src/types/redis_bitmap.cc:
##########
@@ -538,6 +545,281 @@ rocksdb::Status Bitmap::BitOp(BitOpFlags op_flag, const
std::string &op_name, co
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
+namespace {
+using SegmentCache = std::pair<std::string, std::string>;
+
+// used to simplify accessing bitfield that occupies multiple continuous
segments.
+// you can access multiple segments like accessing in one big segments.
+class BitmapSegmentsView {
+ public:
+ BitmapSegmentsView() = default;
+ explicit BitmapSegmentsView(std::vector<SegmentCache *> segments, bool
expand_in_advance = true)
+ : segments_(std::move(segments)) {
+ if (expand_in_advance && segments_.size() > 1) {
+ // expand segments during iteration (not in advance) maybe increase
allocation times.
+ // we assume that all end of each segment except last one will be
accessed.
+ // note: segments_.size() is unsigned.
+ for (size_t i = 0; i < segments_.size() - 1; ++i) {
+ ExpandBitmapSegment(&segments_[i]->second, kBitmapSegmentBytes);
+ }
+ }
+ }
+
+ char &operator[](uint32_t index) noexcept {
+ std::string &content = segments_[index / kBitmapSegmentBytes]->second;
+ index %= kBitmapSegmentBytes;
+ ExpandBitmapSegment(&content, index + 1);
+ return content[index];
+ }
+
+ const char &operator[](uint32_t index) const noexcept {
+ std::string &content = segments_[index / kBitmapSegmentBytes]->second;
+ index %= kBitmapSegmentBytes;
+ ExpandBitmapSegment(&content, index + 1);
+ return content[index];
+ }
+
+ SegmentCache &GetSegment(uint32_t index) noexcept { return
*segments_[index]; }
+
+ const SegmentCache &GetSegment(uint32_t index) const noexcept { return
*segments_[index]; }
+
+ size_t LastSegmentSize() const noexcept { return
segments_.back()->second.size(); }
+
+ size_t SegmentsCount() const noexcept { return segments_.size(); }
+
+ auto SegmentBegin() noexcept { return segments_.begin(); }
+
+ auto SegmentEnd() noexcept { return segments_.end(); }
+
+ private:
+ std::vector<SegmentCache *> segments_;
+};
+} // namespace
+
+template <bool ReadOnly>
+rocksdb::Status Bitmap::bitfield(const Slice &user_key, const
std::vector<BitfieldOperation> &ops,
+ std::vector<std::string> *rets) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+
+ LockGuard guard;
+ if constexpr (!ReadOnly) {
+ guard = LockGuard(storage_->GetLockManager(), ns_key);
+ }
+
+ BitmapMetadata metadata;
+ std::string raw_value;
+ auto s = GetMetadata(ns_key, &metadata, &raw_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+
+ if (metadata.Type() == RedisType::kRedisString) {
+ if constexpr (ReadOnly) {
+ s = BitmapString::BitfieldReadOnly(ns_key, raw_value, ops, rets);
+ } else {
+ s = BitmapString(storage_, namespace_).Bitfield(ns_key, &raw_value, ops,
rets);
+ }
+ return s;
+ }
+
+ if (metadata.Type() != RedisType::kRedisBitmap) {
+ return rocksdb::Status::InvalidArgument("The value is not a bitmap or
string.");
+ }
+
+ std::unordered_map<uint32_t, SegmentCache> seg_cache;
+ auto get_segment = [&](uint32_t index, SegmentCache **res) {
+ auto [seg_itor, no_cache] = seg_cache.try_emplace(index);
+ auto &[sub_key, value] = seg_itor->second;
+ if (no_cache) {
+ auto sub_sub_key = std::to_string(index * kBitmapSegmentBytes);
+ sub_key = InternalKey(ns_key, sub_sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), sub_key,
&value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ }
+ *res = &seg_itor->second;
+ return rocksdb::Status::OK();
+ };
+
+ auto init_view = [&get_segment](uint32_t offset, uint32_t bits,
BitmapSegmentsView *view) {
+ uint32_t first_segment = offset / kBitmapSegmentBits;
+ uint32_t last_segment = (offset + bits - 1) / kBitmapSegmentBits;
+
+ std::vector<SegmentCache *> segments(last_segment - first_segment + 1);
+ for (auto &seg : segments) {
+ auto seg_status = get_segment(first_segment++, &seg);
+ if (!seg_status.ok()) {
+ return seg_status;
+ }
+ }
+ *view = BitmapSegmentsView(std::move(segments));
+ return rocksdb::Status::OK();
+ };
+
+ // get bitfield value from entire bitmap
+ auto get = [&init_view](uint32_t offset, BitfieldEncoding enc, uint64_t
*res) {
+ BitmapSegmentsView view;
+ auto view_status = init_view(offset, enc.Bits(), &view);
+ if (!view_status.ok()) {
+ return view_status;
+ }
+
+ uint32_t first_bit_segment_offset = offset % kBitmapSegmentBits;
+ if (enc.IsSigned()) {
+ *res = GetSignedBitfield(view, first_bit_segment_offset, enc.Bits());
+ } else {
+ *res = GetUnsignedBitfield(view, first_bit_segment_offset, enc.Bits());
+ }
+
+ return rocksdb::Status::OK();
+ };
+
+ if constexpr (ReadOnly) {
+ std::string ret;
+ for (BitfieldOperation op : ops) {
+ if (op.type != BitfieldOperation::Type::kGet) {
+ return rocksdb::Status::InvalidArgument("Write bitfield in read-only
mode.");
+ }
+
+ uint64_t unsigned_old_value = 0;
+ s = get(op.offset, op.encoding, &unsigned_old_value);
+ if (!s.ok()) {
+ return s;
+ }
+
+ if (op.encoding.IsSigned()) {
+ ret =
redis::Integer(CastToSignedWithoutBitChanges(unsigned_old_value));
+ } else {
+ ret = redis::Integer(unsigned_old_value);
+ }
+
+ rets->push_back(std::move(ret));
+ }
+ return rocksdb::Status::OK();
+ }
+
+ auto batch = storage_->GetWriteBatchBase();
+ bitfieldWriteAheadLog(batch, ops);
+
+ // set bitfield value among segments
+ auto set = [&](uint32_t offset, BitfieldEncoding enc,
BitfieldOverflowBehavior overflow, uint64_t value) {
+ BitmapSegmentsView view;
+ s = init_view(offset, enc.Bits(), &view);
+ if (!s.ok()) {
+ return s;
+ }
+
+ uint32_t first_bit_segment_offset = offset % kBitmapSegmentBits;
+ SetBitfield(view, first_bit_segment_offset, enc.Bits(), value);
+
+ for (auto segment = view.SegmentBegin(); segment != view.SegmentEnd();
++segment) {
+ batch->Put((*segment)->first, (*segment)->second);
+ }
+
+ uint64_t segments_cnt = view.SegmentsCount();
+ uint64_t used_size = (offset / 8) + (segments_cnt - 1) *
kBitmapSegmentBytes + view.LastSegmentSize();
+ if (metadata.size < used_size) {
+ metadata.size = used_size;
+ std::string bytes;
+ metadata.Encode(&bytes);
+ batch->Put(metadata_cf_handle_, ns_key, bytes);
+ }
+
+ return rocksdb::Status::OK();
+ };
+
+ std::string ret;
+ for (BitfieldOperation op : ops) {
+ uint64_t unsigned_old_value = 0;
+ s = get(op.offset, op.encoding, &unsigned_old_value);
+ if (!s.ok()) {
+ return s;
+ }
+
+ uint64_t unsigned_new_value = 0;
+ if (BitfieldOp(op, unsigned_old_value, &unsigned_new_value)) {
+ if (op.type != BitfieldOperation::Type::kGet) {
+ s = set(op.offset, op.encoding, op.overflow, unsigned_new_value);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ if (op.type == BitfieldOperation::Type::kSet) {
+ unsigned_new_value = unsigned_old_value;
+ }
+
+ if (op.encoding.IsSigned()) {
+ ret =
redis::Integer(CastToSignedWithoutBitChanges(unsigned_new_value));
+ } else {
+ ret = redis::Integer(unsigned_new_value);
+ }
+ } else {
+ ret = redis::NilString();
+ }
Review Comment:
Should I use std::variant<nullptr_t, uint64_t, int64_t> or define a new type
to handle this? There are 3 of types could store in result list (null, unsigned
and signed). These two way above can store each result in 16 bytes. But it
doesn't seem intuitive enough. (variant is unreadable. new type is hard to
reuse, which fits for the
`Bitmap::Bitfield` function only)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]