torwig commented on code in PR #2120:
URL: https://github.com/apache/kvrocks/pull/2120#discussion_r1506997094


##########
src/types/redis_stream.cc:
##########
@@ -927,6 +971,147 @@ rocksdb::Status Stream::Range(const Slice &stream_name, 
const StreamRangeOptions
   return range(ns_key, metadata, options, entries);
 }
 
+rocksdb::Status Stream::RangeWithPending(const Slice &stream_name, 
StreamRangeOptions &options,
+                                         std::vector<StreamEntry> *entries, 
std::string &group_name,
+                                         std::string &consumer_name, bool 
noack, bool latest) {
+  entries->clear();
+
+  if (options.with_count && options.count == 0) {
+    return rocksdb::Status::OK();
+  }
+
+  if (options.exclude_start && options.start.IsMaximum()) {
+    return rocksdb::Status::InvalidArgument("invalid start ID for the 
interval");
+  }
+
+  if (options.exclude_end && options.end.IsMinimum()) {
+    return rocksdb::Status::InvalidArgument("invalid end ID for the interval");
+  }
+
+  std::string ns_key = AppendNamespacePrefix(stream_name);
+  std::unique_lock<std::mutex> lock(*storage_->GetLockManager()->Get(ns_key));
+
+  StreamMetadata metadata(false);
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  if (!s.ok()) {
+    return s.IsNotFound() ? rocksdb::Status::OK() : s;
+  }
+
+  std::string group_key = internalKeyFromGroupName(ns_key, metadata, 
group_name);
+  std::string get_group_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, 
&get_group_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " 
+ group_name + " for key name " +
+                                            stream_name.ToString());
+  }
+
+  std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, 
group_name, consumer_name);
+  std::string get_consumer_value;
+  s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, 
&get_consumer_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+  if (s.IsNotFound()) {
+    int created_number = 0;
+    lock.unlock();
+    s = CreateConsumer(stream_name, group_name, consumer_name, 
&created_number);

Review Comment:
   @Yangsx-1 Perhaps you need to move the code related to creating a consumer 
(but not acquiring a lock) to a helper function and use it here. Because unlock 
before creating a consumer is a hack and technically any operations can sneak 
between unlock->lock calls from other threads. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to