This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new cbaa28980 fix(stream): XPENDING won't return the idle and deliver 
count when reading with id '<' (#3185)
cbaa28980 is described below

commit cbaa2898049739c4d1dd1a14aa477b2147e598af
Author: hulk <[email protected]>
AuthorDate: Thu Sep 18 09:56:04 2025 +0800

    fix(stream): XPENDING won't return the idle and deliver count when reading 
with id '<' (#3185)
---
 src/commands/cmd_stream.cc                   |  3 +-
 src/types/redis_stream.cc                    |  2 +-
 tests/gocase/unit/type/stream/stream_test.go | 72 ++++++++++++++++++++++++++++
 3 files changed, 75 insertions(+), 2 deletions(-)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index a2ef99509..f4c04a21c 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -932,12 +932,13 @@ class CommandXPending : public Commander {
 
   static Status SendExtResults([[maybe_unused]] Connection *conn, std::string 
*output,
                                std::vector<StreamNACK> &ext_results) {
+    auto now = util::GetTimeStampMS();
     output->append(redis::MultiLen(ext_results.size()));
     for (const auto &entry : ext_results) {
       output->append(redis::MultiLen(4));
       output->append(redis::BulkString(entry.id.ToString()));
       output->append(redis::BulkString(entry.pel_entry.consumer_name));
-      output->append(redis::Integer(entry.pel_entry.last_delivery_time_ms));
+      output->append(redis::Integer(now - 
entry.pel_entry.last_delivery_time_ms));
       output->append(redis::Integer(entry.pel_entry.last_delivery_count));
     }
 
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index df9284c2d..e7082648b 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -1505,7 +1505,7 @@ rocksdb::Status Stream::RangeWithPending(engine::Context 
&ctx, const Slice &stre
       }
       if (!noack) {
         std::string pel_key = internalPelKeyFromGroupAndEntryId(ns_key, 
metadata, group_name, id);
-        StreamPelEntry pel_entry = {0, 0, consumer_name};
+        StreamPelEntry pel_entry = {now_ms, 1, consumer_name};
         std::string pel_value = encodeStreamPelEntryValue(pel_entry);
         s = batch->Put(stream_cf_handle_, pel_key, pel_value);
         if (!s.ok()) return s;
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index 6e8d45dba..e97cde87e 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -2289,6 +2289,78 @@ func TestStreamOffset(t *testing.T) {
                        }}, r)
                }
        })
+
+       t.Run("XPENDING idle time and delivered count, issue #3178", func(t 
*testing.T) {
+               streamName := "mystream-3178"
+               groupName := "mygroup-3178"
+               consumerName := "myconsumer-3178"
+
+               require.NoError(t, rdb.Del(ctx, streamName).Err())
+               require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, 
groupName, "$").Err())
+               msgID := "1-0"
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
+                       Stream: streamName,
+                       ID:     msgID,
+                       Values: []string{"data", "value"},
+               }).Err())
+
+               result, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, ">"},
+                       Count:    1,
+                       NoAck:    false,
+               }).Result()
+               require.NoError(t, err)
+               require.Len(t, result, 1)
+               require.Len(t, result[0].Messages, 1)
+               require.Equal(t, msgID, result[0].Messages[0].ID)
+
+               // Wait to allow idle time to accumulate
+               time.Sleep(100 * time.Millisecond)
+               pendingEntries, err := rdb.XPendingExt(ctx, 
&redis.XPendingExtArgs{
+                       Stream:   streamName,
+                       Group:    groupName,
+                       Start:    "-",
+                       End:      "+",
+                       Count:    10,
+                       Consumer: consumerName,
+               }).Result()
+               require.NoError(t, err)
+               require.Len(t, pendingEntries, 1)
+
+               pendingEntry := pendingEntries[0]
+               require.Equal(t, msgID, pendingEntry.ID)
+               require.Equal(t, consumerName, pendingEntry.Consumer)
+               require.Greater(t, pendingEntry.Idle, time.Millisecond)
+               require.Less(t, pendingEntry.Idle, 10*time.Second)
+               require.EqualValues(t, 1, pendingEntry.RetryCount)
+
+               // Read the same message again to increase delivery count
+               _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
+                       Group:    groupName,
+                       Consumer: consumerName,
+                       Streams:  []string{streamName, "0"},
+                       Count:    1,
+                       NoAck:    false,
+               }).Result()
+               require.NoError(t, err)
+               time.Sleep(100 * time.Millisecond)
+               pendingEntries, err = rdb.XPendingExt(ctx, 
&redis.XPendingExtArgs{
+                       Stream:   streamName,
+                       Group:    groupName,
+                       Start:    "-",
+                       End:      "+",
+                       Count:    10,
+                       Consumer: consumerName,
+               }).Result()
+               require.NoError(t, err)
+               require.Len(t, pendingEntries, 1)
+               pendingEntry = pendingEntries[0]
+               require.EqualValues(t, 2, pendingEntry.RetryCount)
+               require.Greater(t, pendingEntry.Idle, time.Millisecond)
+               require.Less(t, pendingEntry.Idle, 10*time.Second)
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to