torwig commented on code in PR #1855:
URL: https://github.com/apache/kvrocks/pull/1855#discussion_r1375265538
##########
src/server/worker.cc:
##########
@@ -344,6 +344,31 @@ redis::Connection *Worker::removeConnection(int fd) {
return conn;
}
+// MigrateConnection moves the connection to another worker
+// when reducing the number of workers.
+//
+// To make it simple, we would close the connection if it's
+// blocked on a key or stream.
+void Worker::MigrateConnection(Worker *target, redis::Connection *conn) {
+ if (!target || !conn) return;
+ if (conn->current_cmd != nullptr && conn->current_cmd->IsBlocking()) {
+ // don't need to close the connection since destroy worker thread will
close it
+ return;
+ }
+
Review Comment:
Could any blocking command be received after the previous check and before
the actual connection migrating?
##########
tests/gocase/unit/config/config_test.go:
##########
@@ -141,3 +144,92 @@ func TestStartWithoutConfigurationFile(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "SET", "foo", "bar").Err())
require.Equal(t, "bar", rdb.Do(ctx, "GET", "foo").Val())
}
+
+func TestDynamicChangeWorkerThread(t *testing.T) {
+ configs := map[string]string{}
+ srv := util.StartServer(t, configs)
+ defer srv.Close()
+
+ ctx := context.Background()
+ rdb := srv.NewClientWithOption(&redis.Options{
+ MaxIdleConns: 20,
+ MaxRetries: -1, // Disable retry to check connections are
alive after config change
+ })
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ t.Run("Test dynamic change worker thread", func(t *testing.T) {
+ runCommands := func(workers int) {
+ var wg sync.WaitGroup
+ require.NoError(t, rdb.Do(ctx, "CONFIG", "SET",
"workers", strconv.Itoa(workers)).Err())
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for j := 0; j < 10; j++ {
+ require.NoError(t, rdb.Set(ctx,
"foo", "bar", 0).Err())
+ }
+ }()
+ }
+ wg.Wait()
+ }
+ // Reduce worker threads to 4
+ runCommands(4)
+
+ // Reduce worker threads to 1
+ runCommands(1)
+
+ // Reduce worker threads to 12
Review Comment:
Just nitpicking: in this case, we **increasing** worker threads to 12 :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]