This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new c4d9ce052 fix(core): potential data race between blocking command and 
transaction (#2910)
c4d9ce052 is described below

commit c4d9ce052352b3d6452601c673bc4c097ca1ff32
Author: hulk <[email protected]>
AuthorDate: Tue Apr 29 23:52:01 2025 +0800

    fix(core): potential data race between blocking command and transaction 
(#2910)
    
    Currently, the EXEC is an exclusive command and will enter the transaction 
mode
    when running, but the blocking command's callback function doesn't have 
this lock.
    So the callback might get trapped in the wrong transaction mode and break 
the write batch.
    
    For example, the client A sends the command: `BRPOP list 10` and it will 
enter the blocking mode due to no data.
    And the client B uses the transaction to send the command: `MULTI; LPUSH 
list a; EXEC`.
    Then the client A's callback will be revoked after the `LPUSH` command is 
executed, but the EXEC is running.
    So the callback's operation(removing an element from the list) will be 
counted in the transaction.
    
    It closes #2900.
    It closes #2766.
---
 src/commands/blocking_commander.h        | 10 ++++++-
 tests/gocase/unit/type/list/list_test.go | 46 ++++++++++++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git a/src/commands/blocking_commander.h 
b/src/commands/blocking_commander.h
index 2571e4560..619c5dbf1 100644
--- a/src/commands/blocking_commander.h
+++ b/src/commands/blocking_commander.h
@@ -24,6 +24,7 @@
 #include "common/lock_manager.h"
 #include "event_util.h"
 #include "server/redis_connection.h"
+#include "server/server.h"
 
 namespace redis {
 
@@ -70,6 +71,13 @@ class BlockingCommander : public Commander,
   void OnWrite(bufferevent *bev) {
     bool done{false};
     {
+      // The blocking command should not be executed when the server is in 
exclusive state,
+      // because it might have the data race when the server is in transaction 
mode and run
+      // the callback here might cause the current execution also in 
transaction mode.
+      //
+      // For more context, please refer to: 
https://github.com/apache/kvrocks/issues/2900
+      auto concurrency = conn_->GetServer()->WorkConcurrencyGuard();
+
       auto guard = GetLocks();
       done = OnBlockingWrite();
     }
@@ -125,7 +133,7 @@ class BlockingCommander : public Commander,
     UnblockKeys();
     auto bev = conn_->GetBufferEvent();
     conn_->SetCB(bev);
-    bufferevent_enable(bev, EV_READ);
+    bufferevent_enable(bev, EV_READ | EV_WRITE);
   }
 
  protected:
diff --git a/tests/gocase/unit/type/list/list_test.go 
b/tests/gocase/unit/type/list/list_test.go
index 38854aa78..ff6a40ec6 100644
--- a/tests/gocase/unit/type/list/list_test.go
+++ b/tests/gocase/unit/type/list/list_test.go
@@ -21,10 +21,12 @@ package list
 
 import (
        "context"
+       "errors"
        "fmt"
        "math/rand"
        "strconv"
        "strings"
+       "sync"
        "testing"
        "time"
 
@@ -1513,3 +1515,47 @@ func testList(t *testing.T, configs 
util.KvrocksServerConfigs) {
                })
        }
 }
+
+// TestPotentialDataRaceInBlockingCommand is to test blocking command's 
callback
+// shouldn't have data race with concurrent transaction behavior.
+//
+// For more information, please refer to: 
https://github.com/apache/kvrocks/issues/2900
+func TestPotentialDataRaceInBlockingCommand(t *testing.T) {
+       srv := util.StartServer(t, map[string]string{})
+       defer srv.Close()
+
+       ctx, cancelFn := context.WithCancel(context.Background())
+       rdb := srv.NewClient()
+       defer func() { require.NoError(t, rdb.Close()) }()
+
+       listKey := "mylist"
+       rdb.Del(ctx, listKey)
+       var wg sync.WaitGroup
+       for i := 0; i < 4; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for {
+                               err := rdb.BLPop(ctx, 3500*time.Millisecond, 
listKey).Err()
+                               if errors.Is(err, redis.Nil) {
+                                       continue
+                               } else if errors.Is(err, context.Canceled) {
+                                       return
+                               } else {
+                                       require.NoError(t, err)
+                               }
+                       }
+               }()
+       }
+
+       for i := 0; i < 64; i++ {
+               pipe := rdb.TxPipeline()
+               pipe.LPush(ctx, listKey, "element")
+               _, err := pipe.Exec(ctx)
+               require.NoError(t, err)
+               time.Sleep(time.Millisecond * 100)
+       }
+
+       cancelFn()
+       wg.Wait()
+}

Reply via email to