Yangsx-1 commented on code in PR #2120:
URL: https://github.com/apache/kvrocks/pull/2120#discussion_r1506051878
##########
src/types/redis_stream.cc:
##########
@@ -927,6 +971,143 @@ rocksdb::Status Stream::Range(const Slice &stream_name,
const StreamRangeOptions
return range(ns_key, metadata, options, entries);
}
+rocksdb::Status Stream::RangeWithPending(const Slice &stream_name,
StreamRangeOptions &options,
+ std::vector<StreamEntry> *entries,
std::string &group_name,
+ std::string &consumer_name, bool
noack, bool latest) {
+ entries->clear();
+
+ if (options.with_count && options.count == 0) {
+ return rocksdb::Status::OK();
+ }
+
+ if (options.exclude_start && options.start.IsMaximum()) {
+ return rocksdb::Status::InvalidArgument("invalid start ID for the
interval");
+ }
+
+ if (options.exclude_end && options.end.IsMinimum()) {
+ return rocksdb::Status::InvalidArgument("invalid end ID for the interval");
+ }
+
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+
+ StreamMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok()) {
+ return s.IsNotFound() ? rocksdb::Status::OK() : s;
+ }
+
+ std::string group_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string get_group_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key,
&get_group_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
+ stream_name.ToString());
+ }
+
+ std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
+ std::string get_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ int created_number = 0;
+ s = CreateConsumer(stream_name, group_name, consumer_name,
&created_number);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ LockGuard guard(storage_->GetLockManager(), ns_key);
Review Comment:
We can but i don't suggest this way, because there maybe other threads do
something about the stream, stream's group and group's consumer, and other
operations all handle the lock of the stream key, if we use a lock of group key
here, some potential problems may exist.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]