This is an automated email from the ASF dual-hosted git repository.
yangshixi pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 3bdf1ecf Rename last_idle_ms and last_active_ms in stream consumer
metadata (#2341)
3bdf1ecf is described below
commit 3bdf1ecf3cb23007fbfb68b5275f644719f91f43
Author: Hauru <[email protected]>
AuthorDate: Thu May 30 17:15:48 2024 +0800
Rename last_idle_ms and last_active_ms in stream consumer metadata (#2341)
---
src/commands/cmd_stream.cc | 4 ++--
src/types/redis_stream.cc | 20 ++++++++++----------
src/types/redis_stream_base.h | 4 ++--
3 files changed, 14 insertions(+), 14 deletions(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 8468ee30..7b3bf54e 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -718,9 +718,9 @@ class CommandXInfo : public Commander {
output->append(redis::BulkString("pending"));
output->append(redis::Integer(it.second.pending_number));
output->append(redis::BulkString("idle"));
- output->append(redis::Integer(now_ms - it.second.last_idle_ms));
+ output->append(redis::Integer(now_ms -
it.second.last_attempted_interaction_ms));
output->append(redis::BulkString("inactive"));
- output->append(redis::Integer(now_ms - it.second.last_active_ms));
+ output->append(redis::Integer(now_ms -
it.second.last_successful_interaction_ms));
}
return Status::OK();
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index a3bdc63b..0ec71270 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -237,8 +237,8 @@ std::string
Stream::consumerNameFromInternalKey(rocksdb::Slice key) const {
std::string Stream::encodeStreamConsumerMetadataValue(const
StreamConsumerMetadata &consumer_metadata) {
std::string dst;
PutFixed64(&dst, consumer_metadata.pending_number);
- PutFixed64(&dst, consumer_metadata.last_idle_ms);
- PutFixed64(&dst, consumer_metadata.last_active_ms);
+ PutFixed64(&dst, consumer_metadata.last_attempted_interaction_ms);
+ PutFixed64(&dst, consumer_metadata.last_successful_interaction_ms);
return dst;
}
@@ -246,8 +246,8 @@ StreamConsumerMetadata
Stream::decodeStreamConsumerMetadataValue(const std::stri
StreamConsumerMetadata consumer_metadata;
rocksdb::Slice input(value);
GetFixed64(&input, &consumer_metadata.pending_number);
- GetFixed64(&input, &consumer_metadata.last_idle_ms);
- GetFixed64(&input, &consumer_metadata.last_active_ms);
+ GetFixed64(&input, &consumer_metadata.last_attempted_interaction_ms);
+ GetFixed64(&input, &consumer_metadata.last_successful_interaction_ms);
return consumer_metadata;
}
@@ -398,8 +398,8 @@ rocksdb::Status Stream::ClaimPelEntries(const Slice
&stream_name, const std::str
consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
}
auto now = util::GetTimeStampMS();
- consumer_metadata.last_idle_ms = now;
- consumer_metadata.last_active_ms = now;
+ consumer_metadata.last_attempted_interaction_ms = now;
+ consumer_metadata.last_successful_interaction_ms = now;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisStream);
@@ -619,8 +619,8 @@ rocksdb::Status Stream::createConsumerWithoutLock(const
Slice &stream_name, cons
StreamConsumerMetadata consumer_metadata;
auto now = util::GetTimeStampMS();
- consumer_metadata.last_idle_ms = now;
- consumer_metadata.last_active_ms = now;
+ consumer_metadata.last_attempted_interaction_ms = now;
+ consumer_metadata.last_successful_interaction_ms = now;
std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata,
group_name, consumer_name);
std::string consumer_value =
encodeStreamConsumerMetadataValue(consumer_metadata);
std::string get_consumer_value;
@@ -1286,8 +1286,8 @@ rocksdb::Status Stream::RangeWithPending(const Slice
&stream_name, StreamRangeOp
}
StreamConsumerMetadata consumer_metadata =
decodeStreamConsumerMetadataValue(get_consumer_value);
auto now_ms = util::GetTimeStampMS();
- consumer_metadata.last_idle_ms = now_ms;
- consumer_metadata.last_active_ms = now_ms;
+ consumer_metadata.last_attempted_interaction_ms = now_ms;
+ consumer_metadata.last_successful_interaction_ms = now_ms;
if (latest) {
options.start = consumergroup_metadata.last_delivered_id;
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index efea48ec..889e4046 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -183,8 +183,8 @@ struct StreamConsumerGroupMetadata {
struct StreamConsumerMetadata {
uint64_t pending_number = 0;
- uint64_t last_idle_ms;
- uint64_t last_active_ms;
+ uint64_t last_attempted_interaction_ms;
+ uint64_t last_successful_interaction_ms;
};
enum class StreamSubkeyType {