This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new cbaa28980 fix(stream): XPENDING won't return the idle and deliver
count when reading with id '<' (#3185)
cbaa28980 is described below
commit cbaa2898049739c4d1dd1a14aa477b2147e598af
Author: hulk <[email protected]>
AuthorDate: Thu Sep 18 09:56:04 2025 +0800
fix(stream): XPENDING won't return the idle and deliver count when reading
with id '<' (#3185)
---
src/commands/cmd_stream.cc | 3 +-
src/types/redis_stream.cc | 2 +-
tests/gocase/unit/type/stream/stream_test.go | 72 ++++++++++++++++++++++++++++
3 files changed, 75 insertions(+), 2 deletions(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index a2ef99509..f4c04a21c 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -932,12 +932,13 @@ class CommandXPending : public Commander {
static Status SendExtResults([[maybe_unused]] Connection *conn, std::string
*output,
std::vector<StreamNACK> &ext_results) {
+ auto now = util::GetTimeStampMS();
output->append(redis::MultiLen(ext_results.size()));
for (const auto &entry : ext_results) {
output->append(redis::MultiLen(4));
output->append(redis::BulkString(entry.id.ToString()));
output->append(redis::BulkString(entry.pel_entry.consumer_name));
- output->append(redis::Integer(entry.pel_entry.last_delivery_time_ms));
+ output->append(redis::Integer(now -
entry.pel_entry.last_delivery_time_ms));
output->append(redis::Integer(entry.pel_entry.last_delivery_count));
}
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index df9284c2d..e7082648b 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -1505,7 +1505,7 @@ rocksdb::Status Stream::RangeWithPending(engine::Context
&ctx, const Slice &stre
}
if (!noack) {
std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key,
metadata, group_name, id);
- StreamPelEntry pel_entry = {0, 0, consumer_name};
+ StreamPelEntry pel_entry = {now_ms, 1, consumer_name};
std::string pel_value = encodeStreamPelEntryValue(pel_entry);
s = batch->Put(stream_cf_handle_, pel_key, pel_value);
if (!s.ok()) return s;
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index 6e8d45dba..e97cde87e 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -2289,6 +2289,78 @@ func TestStreamOffset(t *testing.T) {
}}, r)
}
})
+
+ t.Run("XPENDING idle time and delivered count, issue #3178", func(t
*testing.T) {
+ streamName := "mystream-3178"
+ groupName := "mygroup-3178"
+ consumerName := "myconsumer-3178"
+
+ require.NoError(t, rdb.Del(ctx, streamName).Err())
+ require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName,
groupName, "$").Err())
+ msgID := "1-0"
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+ Stream: streamName,
+ ID: msgID,
+ Values: []string{"data", "value"},
+ }).Err())
+
+ result, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, ">"},
+ Count: 1,
+ NoAck: false,
+ }).Result()
+ require.NoError(t, err)
+ require.Len(t, result, 1)
+ require.Len(t, result[0].Messages, 1)
+ require.Equal(t, msgID, result[0].Messages[0].ID)
+
+ // Wait to allow idle time to accumulate
+ time.Sleep(100 * time.Millisecond)
+ pendingEntries, err := rdb.XPendingExt(ctx,
&redis.XPendingExtArgs{
+ Stream: streamName,
+ Group: groupName,
+ Start: "-",
+ End: "+",
+ Count: 10,
+ Consumer: consumerName,
+ }).Result()
+ require.NoError(t, err)
+ require.Len(t, pendingEntries, 1)
+
+ pendingEntry := pendingEntries[0]
+ require.Equal(t, msgID, pendingEntry.ID)
+ require.Equal(t, consumerName, pendingEntry.Consumer)
+ require.Greater(t, pendingEntry.Idle, time.Millisecond)
+ require.Less(t, pendingEntry.Idle, 10*time.Second)
+ require.EqualValues(t, 1, pendingEntry.RetryCount)
+
+ // Read the same message again to increase delivery count
+ _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+ Group: groupName,
+ Consumer: consumerName,
+ Streams: []string{streamName, "0"},
+ Count: 1,
+ NoAck: false,
+ }).Result()
+ require.NoError(t, err)
+ time.Sleep(100 * time.Millisecond)
+ pendingEntries, err = rdb.XPendingExt(ctx,
&redis.XPendingExtArgs{
+ Stream: streamName,
+ Group: groupName,
+ Start: "-",
+ End: "+",
+ Count: 10,
+ Consumer: consumerName,
+ }).Result()
+ require.NoError(t, err)
+ require.Len(t, pendingEntries, 1)
+ pendingEntry = pendingEntries[0]
+ require.EqualValues(t, 2, pendingEntry.RetryCount)
+ require.Greater(t, pendingEntry.Idle, time.Millisecond)
+ require.Less(t, pendingEntry.Idle, 10*time.Second)
+ })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {