This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new c4d9ce052 fix(core): potential data race between blocking command and
transaction (#2910)
c4d9ce052 is described below
commit c4d9ce052352b3d6452601c673bc4c097ca1ff32
Author: hulk <[email protected]>
AuthorDate: Tue Apr 29 23:52:01 2025 +0800
fix(core): potential data race between blocking command and transaction
(#2910)
Currently, the EXEC is an exclusive command and will enter the transaction
mode
when running, but the blocking command's callback function doesn't have
this lock.
So the callback might get trapped in the wrong transaction mode and break
the write batch.
For example, the client A sends the command: `BRPOP list 10` and it will
enter the blocking mode due to no data.
And the client B uses the transaction to send the command: `MULTI; LPUSH
list a; EXEC`.
Then the client A's callback will be revoked after the `LPUSH` command is
executed, but the EXEC is running.
So the callback's operation(removing an element from the list) will be
counted in the transaction.
It closes #2900.
It closes #2766.
---
src/commands/blocking_commander.h | 10 ++++++-
tests/gocase/unit/type/list/list_test.go | 46 ++++++++++++++++++++++++++++++++
2 files changed, 55 insertions(+), 1 deletion(-)
diff --git a/src/commands/blocking_commander.h
b/src/commands/blocking_commander.h
index 2571e4560..619c5dbf1 100644
--- a/src/commands/blocking_commander.h
+++ b/src/commands/blocking_commander.h
@@ -24,6 +24,7 @@
#include "common/lock_manager.h"
#include "event_util.h"
#include "server/redis_connection.h"
+#include "server/server.h"
namespace redis {
@@ -70,6 +71,13 @@ class BlockingCommander : public Commander,
void OnWrite(bufferevent *bev) {
bool done{false};
{
+ // The blocking command should not be executed when the server is in
exclusive state,
+ // because it might have the data race when the server is in transaction
mode and run
+ // the callback here might cause the current execution also in
transaction mode.
+ //
+ // For more context, please refer to:
https://github.com/apache/kvrocks/issues/2900
+ auto concurrency = conn_->GetServer()->WorkConcurrencyGuard();
+
auto guard = GetLocks();
done = OnBlockingWrite();
}
@@ -125,7 +133,7 @@ class BlockingCommander : public Commander,
UnblockKeys();
auto bev = conn_->GetBufferEvent();
conn_->SetCB(bev);
- bufferevent_enable(bev, EV_READ);
+ bufferevent_enable(bev, EV_READ | EV_WRITE);
}
protected:
diff --git a/tests/gocase/unit/type/list/list_test.go
b/tests/gocase/unit/type/list/list_test.go
index 38854aa78..ff6a40ec6 100644
--- a/tests/gocase/unit/type/list/list_test.go
+++ b/tests/gocase/unit/type/list/list_test.go
@@ -21,10 +21,12 @@ package list
import (
"context"
+ "errors"
"fmt"
"math/rand"
"strconv"
"strings"
+ "sync"
"testing"
"time"
@@ -1513,3 +1515,47 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
})
}
}
+
+// TestPotentialDataRaceInBlockingCommand is to test blocking command's
callback
+// shouldn't have data race with concurrent transaction behavior.
+//
+// For more information, please refer to:
https://github.com/apache/kvrocks/issues/2900
+func TestPotentialDataRaceInBlockingCommand(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{})
+ defer srv.Close()
+
+ ctx, cancelFn := context.WithCancel(context.Background())
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ listKey := "mylist"
+ rdb.Del(ctx, listKey)
+ var wg sync.WaitGroup
+ for i := 0; i < 4; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ err := rdb.BLPop(ctx, 3500*time.Millisecond,
listKey).Err()
+ if errors.Is(err, redis.Nil) {
+ continue
+ } else if errors.Is(err, context.Canceled) {
+ return
+ } else {
+ require.NoError(t, err)
+ }
+ }
+ }()
+ }
+
+ for i := 0; i < 64; i++ {
+ pipe := rdb.TxPipeline()
+ pipe.LPush(ctx, listKey, "element")
+ _, err := pipe.Exec(ctx)
+ require.NoError(t, err)
+ time.Sleep(time.Millisecond * 100)
+ }
+
+ cancelFn()
+ wg.Wait()
+}