torwig commented on code in PR #2120:
URL: https://github.com/apache/kvrocks/pull/2120#discussion_r1508078485


##########
src/types/redis_stream.cc:
##########
@@ -927,6 +977,145 @@ rocksdb::Status Stream::Range(const Slice &stream_name, 
const StreamRangeOptions
   return range(ns_key, metadata, options, entries);
 }
 
+rocksdb::Status Stream::RangeWithPending(const Slice &stream_name, 
StreamRangeOptions &options,
+                                         std::vector<StreamEntry> *entries, 
std::string &group_name,
+                                         std::string &consumer_name, bool 
noack, bool latest) {
+  entries->clear();
+
+  if (options.with_count && options.count == 0) {
+    return rocksdb::Status::OK();
+  }
+
+  if (options.exclude_start && options.start.IsMaximum()) {
+    return rocksdb::Status::InvalidArgument("invalid start ID for the 
interval");
+  }
+
+  if (options.exclude_end && options.end.IsMinimum()) {
+    return rocksdb::Status::InvalidArgument("invalid end ID for the interval");
+  }
+
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+
+  StreamMetadata metadata(false);
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  if (!s.ok()) {
+    return s.IsNotFound() ? rocksdb::Status::OK() : s;
+  }
+
+  std::string group_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
+  std::string get_group_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, 
&get_group_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " 
+ group_name + " for key name " +
+                                            stream_name.ToString());
+  }
+
+  std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, 
group_name, consumer_name);
+  std::string get_consumer_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, 
&get_consumer_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    int created_number = 0;
+    s = createConsumerWithoutLock(stream_name, group_name, consumer_name, 
&created_number);
+    if (!s.ok()) {
+      return s;
+    }
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisStream);
+  batch->PutLogData(log_data.Encode());
+
+  StreamConsumerGroupMetadata consumergroup_metadata = 
decodeStreamConsumerGroupMetadataValue(get_group_value);
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, 
&get_consumer_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  StreamConsumerMetadata consumer_metadata = 
decodeStreamConsumerMetadataValue(get_consumer_value);
+  auto now = util::GetTimeStampMS();
+  consumer_metadata.last_idle = now;
+  consumer_metadata.last_active = now;
+
+  if (latest) {
+    options.start = consumergroup_metadata.last_delivered_id;
+    s = range(ns_key, metadata, options, entries);
+    if (!s.ok()) {
+      return s;
+    }
+    StreamEntryID maxid = {0, 0};
+    for (const auto &entry : *entries) {
+      StreamEntryID id;
+      Status st = ParseStreamEntryID(entry.key, &id);
+      if (!st.IsOK()) {
+        return rocksdb::Status::InvalidArgument(st.Msg());
+      }
+      if (id > maxid) {
+        maxid = id;
+      }
+      if (!noack) {
+        std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, 
metadata, group_name, id);
+        StreamPelEntry pel_entry = {0, 0, consumer_name};
+        std::string pel_value = encodeStreamPelEntryValue(pel_entry);
+        batch->Put(stream_cf_handle_, pel_key, pel_value);
+        consumergroup_metadata.entries_read += 1;
+        consumergroup_metadata.pending_number += 1;
+        consumer_metadata.pending_number += 1;
+      }
+    }
+    if (maxid > consumergroup_metadata.last_delivered_id) {
+      consumergroup_metadata.last_delivered_id = maxid;
+    }
+  } else {
+    std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, 
metadata, group_name, StreamEntryID::Minimum());
+    std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, 
group_name, StreamEntryID::Maximum());
+
+    rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
+    LatestSnapShot ss(storage_);
+    read_options.snapshot = ss.GetSnapShot();
+    rocksdb::Slice upper_bound(end_key);
+    read_options.iterate_upper_bound = &upper_bound;
+    rocksdb::Slice lower_bound(prefix_key);
+    read_options.iterate_lower_bound = &lower_bound;
+
+    auto iter = util::UniqueIterator(storage_, read_options, 
stream_cf_handle_);
+    uint64_t count = 0;
+    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+      if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) 
{
+        std::string tmp_group_name;
+        StreamEntryID entry_id = 
groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
+        if (tmp_group_name != group_name) continue;
+        StreamPelEntry pel_entry = 
decodeStreamPelEntryValue(iter->value().ToString());
+        if (pel_entry.consumer_name != consumer_name) continue;
+        std::string raw_value;
+        rocksdb::Status st = getEntryRawValue(ns_key, metadata, entry_id, 
&raw_value);
+        if (!st.ok() && !st.IsNotFound()) {
+          return st;
+        }
+        std::vector<std::string> values;

Review Comment:
   What will be if `st.IsNotFound()` is `true`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to