jihuayu commented on code in PR #2120:
URL: https://github.com/apache/kvrocks/pull/2120#discussion_r1504228590
##########
src/types/redis_stream.cc:
##########
@@ -927,6 +971,143 @@ rocksdb::Status Stream::Range(const Slice &stream_name,
const StreamRangeOptions
return range(ns_key, metadata, options, entries);
}
+rocksdb::Status Stream::RangeWithPending(const Slice &stream_name,
StreamRangeOptions &options,
+ std::vector<StreamEntry> *entries,
std::string &group_name,
+ std::string &consumer_name, bool
noack, bool latest) {
+ entries->clear();
+
+ if (options.with_count && options.count == 0) {
+ return rocksdb::Status::OK();
+ }
+
+ if (options.exclude_start && options.start.IsMaximum()) {
+ return rocksdb::Status::InvalidArgument("invalid start ID for the
interval");
+ }
+
+ if (options.exclude_end && options.end.IsMinimum()) {
+ return rocksdb::Status::InvalidArgument("invalid end ID for the interval");
+ }
+
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+
+ StreamMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok()) {
+ return s.IsNotFound() ? rocksdb::Status::OK() : s;
+ }
+
+ std::string group_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string get_group_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key,
&get_group_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
+ stream_name.ToString());
+ }
+
+ std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
+ std::string get_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ int created_number = 0;
+ s = CreateConsumer(stream_name, group_name, consumer_name,
&created_number);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisStream);
+ batch->PutLogData(log_data.Encode());
+
+ StreamConsumerGroupMetadata consumergroup_metadata =
decodeStreamConsumerGroupMetadataValue(get_group_value);
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
+ StreamConsumerMetadata consumer_metadata =
decodeStreamConsumerMetadataValue(get_consumer_value);
+ auto now = util::GetTimeStampMS();
+ consumer_metadata.last_idle = now;
+ consumer_metadata.last_active = now;
+
+ if (latest) {
+ options.start = consumergroup_metadata.last_delivered_id;
+ s = range(ns_key, metadata, options, entries);
+ if (!s.ok()) {
+ return s;
+ }
+ StreamEntryID maxid = {0, 0};
+ for (const auto &entry : *entries) {
+ StreamEntryID id;
+ Status st = ParseStreamEntryID(entry.key, &id);
+ if (!st.IsOK()) {
+ return rocksdb::Status::InvalidArgument(st.Msg());
+ }
+ if (id > maxid) {
+ maxid = id;
+ }
+ if (!noack) {
+ std::string pel_key = internalPelKeyFromGroupAndId(ns_key, metadata,
group_name, id);
+ StreamPelEntry pel_entry = {0, 0, consumer_name};
+ std::string pel_value = encodeStreamPelEntryValue(pel_entry);
+ batch->Put(stream_cf_handle_, pel_key, pel_value);
+ consumergroup_metadata.entries_read += 1;
+ consumergroup_metadata.pending_number += 1;
+ consumer_metadata.pending_number += 1;
+ }
+ }
+ if (maxid > consumergroup_metadata.last_delivered_id) {
+ consumergroup_metadata.last_delivered_id = maxid;
+ }
+ } else {
+ std::string next_version_prefix_key =
+ InternalKey(ns_key, "", metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
+ std::string prefix_key = InternalKey(ns_key, "", metadata.version,
storage_->IsSlotIdEncoded()).Encode();
Review Comment:
Can we only scan this consumer group PEL?
Items in the same consumer group should be consecutive.
```c++
std::string prefix_key =
internalPelKeyFromGroupAndId(ns_key,metadata,group_name,StreamEntryID::Minimum());
std::string end_key =
internalPelKeyFromGroupAndId(ns_key,metadata,group_name,StreamEntryID::Maximum());
```
##########
src/types/redis_stream.cc:
##########
@@ -927,6 +971,143 @@ rocksdb::Status Stream::Range(const Slice &stream_name,
const StreamRangeOptions
return range(ns_key, metadata, options, entries);
}
+rocksdb::Status Stream::RangeWithPending(const Slice &stream_name,
StreamRangeOptions &options,
+ std::vector<StreamEntry> *entries,
std::string &group_name,
+ std::string &consumer_name, bool
noack, bool latest) {
+ entries->clear();
+
+ if (options.with_count && options.count == 0) {
+ return rocksdb::Status::OK();
+ }
+
+ if (options.exclude_start && options.start.IsMaximum()) {
+ return rocksdb::Status::InvalidArgument("invalid start ID for the
interval");
+ }
+
+ if (options.exclude_end && options.end.IsMinimum()) {
+ return rocksdb::Status::InvalidArgument("invalid end ID for the interval");
+ }
+
+ std::string ns_key = AppendNamespacePrefix(stream_name);
+
+ StreamMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ if (!s.ok()) {
+ return s.IsNotFound() ? rocksdb::Status::OK() : s;
+ }
+
+ std::string group_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string get_group_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key,
&get_group_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group "
+ group_name + " for key name " +
+ stream_name.ToString());
+ }
+
+ std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
+ std::string get_consumer_value;
+ s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key,
&get_consumer_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ if (s.IsNotFound()) {
+ int created_number = 0;
+ s = CreateConsumer(stream_name, group_name, consumer_name,
&created_number);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ LockGuard guard(storage_->GetLockManager(), ns_key);
Review Comment:
Look like we don't modify the stream metadata.
Can we only lock the consumer group?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]