Copilot commented on code in PR #978:
URL: 
https://github.com/apache/skywalking-banyandb/pull/978#discussion_r2840193461


##########
banyand/liaison/grpc/group_deletion.go:
##########
@@ -0,0 +1,383 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "fmt"
+       "sync"
+
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       internalDeletionTaskGroup        = "_deletion_task"
+       internalDeletionTaskPropertyName = "deletion_task"
+       taskDataTagName                  = "task_data"
+)
+
+type groupDeletionTaskManager struct {
+       schemaRegistry metadata.Repo
+       propServer     *propertyServer
+       log            *logger.Logger
+       groupRepo      *groupRepo
+       tasks          sync.Map
+}
+
+func newGroupDeletionTaskManager(schemaRegistry metadata.Repo, propServer 
*propertyServer, gr *groupRepo, l *logger.Logger) *groupDeletionTaskManager {
+       return &groupDeletionTaskManager{
+               schemaRegistry: schemaRegistry,
+               propServer:     propServer,
+               groupRepo:      gr,
+               log:            l,
+       }
+}
+
+func (m *groupDeletionTaskManager) initPropertyStorage(ctx context.Context) 
error {
+       group := &commonv1.Group{
+               Metadata: &commonv1.Metadata{
+                       Name: internalDeletionTaskGroup,
+               },
+               Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+               ResourceOpts: &commonv1.ResourceOpts{
+                       ShardNum: 1,
+               },
+       }
+       _, getGroupErr := m.schemaRegistry.GroupRegistry().GetGroup(ctx, 
internalDeletionTaskGroup)
+       if getGroupErr != nil {
+               if createErr := 
m.schemaRegistry.GroupRegistry().CreateGroup(ctx, group); createErr != nil {
+                       return fmt.Errorf("failed to create internal deletion 
task group: %w", createErr)
+               }
+       }
+       propSchema := &databasev1.Property{
+               Metadata: &commonv1.Metadata{
+                       Group: internalDeletionTaskGroup,
+                       Name:  internalDeletionTaskPropertyName,
+               },
+               Tags: []*databasev1.TagSpec{
+                       {
+                               Name: taskDataTagName,
+                               Type: databasev1.TagType_TAG_TYPE_DATA_BINARY,
+                       },
+               },
+       }
+       _, getPropErr := m.schemaRegistry.PropertyRegistry().GetProperty(ctx, 
propSchema.Metadata)
+       if getPropErr != nil {
+               if createErr := 
m.schemaRegistry.PropertyRegistry().CreateProperty(ctx, propSchema); createErr 
!= nil {
+                       return fmt.Errorf("failed to create internal deletion 
task property schema: %w", createErr)
+               }
+       }
+       return nil
+}
+
+func (m *groupDeletionTaskManager) startDeletion(ctx context.Context, group 
string) error {
+       if _, loaded := m.tasks.LoadOrStore(group, true); loaded {
+               return fmt.Errorf("deletion task for group %s is already in 
progress", group)
+       }
+       task := &databasev1.GroupDeletionTask{
+               CurrentPhase:  databasev1.GroupDeletionTask_PHASE_PENDING,
+               TotalCounts:   make(map[string]int32),
+               DeletedCounts: make(map[string]int32),
+               CreatedAt:     timestamppb.Now(),
+       }
+       dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+       if dataErr != nil {
+               m.tasks.Delete(group)
+               return fmt.Errorf("failed to collect data info for group %s: 
%w", group, dataErr)
+       }
+       var totalDataSize int64
+       for _, di := range dataInfo {
+               totalDataSize += di.GetDataSizeBytes()
+       }
+       task.TotalDataSizeBytes = totalDataSize
+       if saveErr := m.saveDeletionTask(ctx, group, task); saveErr != nil {
+               m.tasks.Delete(group)
+               return fmt.Errorf("failed to save initial deletion task for 
group %s: %w", group, saveErr)
+       }
+       go m.executeDeletion(ctx, group, task)

Review Comment:
   Context cancellation risk: The context from the initial HTTP request is 
passed to a goroutine that performs long-running deletion operations. If the 
client disconnects or the request times out, the context will be canceled, 
potentially interrupting the deletion process mid-way. Consider using 
context.Background() or a server-scoped context for the background deletion 
task to ensure it runs to completion regardless of the original request's 
lifecycle.
   ```suggestion
        go m.executeDeletion(context.Background(), group, task)
   ```



##########
banyand/liaison/grpc/discovery.go:
##########
@@ -138,13 +141,79 @@ func (i identity) String() string {
 
 var _ schema.EventHandler = (*groupRepo)(nil)
 
+type groupInflight struct {
+       done    chan struct{}
+       deleted chan struct{}
+       wg      sync.WaitGroup
+}
+
 type groupRepo struct {
        schema.UnimplementedOnInitHandler
        log          *logger.Logger
        resourceOpts map[string]*commonv1.ResourceOpts
+       inflight     map[string]*groupInflight
        sync.RWMutex
 }
 
+func (s *groupRepo) acquireRequest(groupName string) error {
+       s.RWMutex.Lock()
+       defer s.RWMutex.Unlock()
+       item, ok := s.inflight[groupName]
+       if ok && item.done != nil {
+               return fmt.Errorf("%s: %w", groupName, errGroupPendingDeletion)
+       }
+       if !ok {
+               item = &groupInflight{}
+               s.inflight[groupName] = item
+       }
+       item.wg.Add(1)
+       return nil
+}
+
+func (s *groupRepo) releaseRequest(groupName string) {
+       s.RWMutex.RLock()
+       item, ok := s.inflight[groupName]
+       s.RWMutex.RUnlock()
+       if ok {
+               item.wg.Done()
+       }
+}
+
+func (s *groupRepo) startPendingDeletion(groupName string) <-chan struct{} {
+       s.RWMutex.Lock()
+       item, ok := s.inflight[groupName]
+       if !ok {
+               item = &groupInflight{}
+               s.inflight[groupName] = item
+       }
+       item.done = make(chan struct{})
+       item.deleted = make(chan struct{})
+       s.RWMutex.Unlock()

Review Comment:
   Race condition: There's a potential race between acquireRequest and 
startPendingDeletion. If acquireRequest runs between lines 184-188 of 
startPendingDeletion (after the groupInflight is created but before item.done 
is set), it will pass the check at line 162 (since item.done is still nil) and 
increment the wait group. However, the goroutine at line 192-195 has already 
captured the wait group state and may have already called item.wg.Wait(). This 
could lead to requests being allowed through even though deletion has started. 
Consider setting item.done before releasing the lock in startPendingDeletion, 
or using a separate flag to mark deletion intent.



##########
banyand/internal/wqueue/wqueue.go:
##########
@@ -125,6 +125,15 @@ func (q *Queue[S, O]) Close() error {
        return nil
 }
 
+// Drop closes the queue and removes all data files from disk.
+func (q *Queue[S, O]) Drop() error {
+       if closeErr := q.Close(); closeErr != nil {
+               return closeErr
+       }

Review Comment:
   Missing error check: The MustRMAll method is called after Close returns, but 
if Close fails, the file system may be in an inconsistent state. Additionally, 
MustRMAll can panic on errors (suggested by the "Must" prefix), but this panic 
wouldn't be caught or handled gracefully. Consider checking if the directory 
still exists before calling MustRMAll, or handling potential panics from 
MustRMAll.
   ```suggestion
   func (q *Queue[S, O]) Drop() (err error) {
        if closeErr := q.Close(); closeErr != nil {
                return closeErr
        }
        defer func() {
                if r := recover(); r != nil {
                        err = fmt.Errorf("failed to remove queue data at %s: 
%v", q.location, r)
                }
        }()
   ```



##########
banyand/liaison/grpc/group_deletion.go:
##########
@@ -0,0 +1,383 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "fmt"
+       "sync"
+
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       internalDeletionTaskGroup        = "_deletion_task"
+       internalDeletionTaskPropertyName = "deletion_task"
+       taskDataTagName                  = "task_data"
+)
+
+type groupDeletionTaskManager struct {
+       schemaRegistry metadata.Repo
+       propServer     *propertyServer
+       log            *logger.Logger
+       groupRepo      *groupRepo
+       tasks          sync.Map
+}
+
+func newGroupDeletionTaskManager(schemaRegistry metadata.Repo, propServer 
*propertyServer, gr *groupRepo, l *logger.Logger) *groupDeletionTaskManager {
+       return &groupDeletionTaskManager{
+               schemaRegistry: schemaRegistry,
+               propServer:     propServer,
+               groupRepo:      gr,
+               log:            l,
+       }
+}
+
+func (m *groupDeletionTaskManager) initPropertyStorage(ctx context.Context) 
error {
+       group := &commonv1.Group{
+               Metadata: &commonv1.Metadata{
+                       Name: internalDeletionTaskGroup,
+               },
+               Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+               ResourceOpts: &commonv1.ResourceOpts{
+                       ShardNum: 1,
+               },
+       }
+       _, getGroupErr := m.schemaRegistry.GroupRegistry().GetGroup(ctx, 
internalDeletionTaskGroup)
+       if getGroupErr != nil {
+               if createErr := 
m.schemaRegistry.GroupRegistry().CreateGroup(ctx, group); createErr != nil {
+                       return fmt.Errorf("failed to create internal deletion 
task group: %w", createErr)
+               }
+       }
+       propSchema := &databasev1.Property{
+               Metadata: &commonv1.Metadata{
+                       Group: internalDeletionTaskGroup,
+                       Name:  internalDeletionTaskPropertyName,
+               },
+               Tags: []*databasev1.TagSpec{
+                       {
+                               Name: taskDataTagName,
+                               Type: databasev1.TagType_TAG_TYPE_DATA_BINARY,
+                       },
+               },
+       }
+       _, getPropErr := m.schemaRegistry.PropertyRegistry().GetProperty(ctx, 
propSchema.Metadata)
+       if getPropErr != nil {
+               if createErr := 
m.schemaRegistry.PropertyRegistry().CreateProperty(ctx, propSchema); createErr 
!= nil {
+                       return fmt.Errorf("failed to create internal deletion 
task property schema: %w", createErr)
+               }
+       }
+       return nil
+}
+
+func (m *groupDeletionTaskManager) startDeletion(ctx context.Context, group 
string) error {
+       if _, loaded := m.tasks.LoadOrStore(group, true); loaded {
+               return fmt.Errorf("deletion task for group %s is already in 
progress", group)
+       }
+       task := &databasev1.GroupDeletionTask{
+               CurrentPhase:  databasev1.GroupDeletionTask_PHASE_PENDING,
+               TotalCounts:   make(map[string]int32),
+               DeletedCounts: make(map[string]int32),
+               CreatedAt:     timestamppb.Now(),
+       }
+       dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+       if dataErr != nil {
+               m.tasks.Delete(group)
+               return fmt.Errorf("failed to collect data info for group %s: 
%w", group, dataErr)
+       }
+       var totalDataSize int64
+       for _, di := range dataInfo {
+               totalDataSize += di.GetDataSizeBytes()
+       }
+       task.TotalDataSizeBytes = totalDataSize
+       if saveErr := m.saveDeletionTask(ctx, group, task); saveErr != nil {
+               m.tasks.Delete(group)
+               return fmt.Errorf("failed to save initial deletion task for 
group %s: %w", group, saveErr)
+       }
+       go m.executeDeletion(ctx, group, task)
+       return nil
+}
+
+func (m *groupDeletionTaskManager) executeDeletion(ctx context.Context, group 
string, task *databasev1.GroupDeletionTask) {
+       defer m.tasks.Delete(group)
+       opt := schema.ListOpt{Group: group}
+
+       task.Message = "waiting for in-flight requests to complete"
+       m.saveProgress(ctx, group, task)
+       done := m.groupRepo.startPendingDeletion(group)
+       defer m.groupRepo.clearPendingDeletion(group)
+       <-done

Review Comment:
   Race condition: clearPendingDeletion is called in a defer before 
awaitDeleted completes in executeDeletion. This means the groupInflight entry 
is deleted from the map at line 132 (via defer at line 132), but then 
awaitDeleted is called at line 162 which tries to read from the deleted 
channel. If the OnDelete handler runs between clearPendingDeletion and 
awaitDeleted, the deleted channel will never be closed, causing awaitDeleted to 
return an already-closed channel instead of waiting. The defer for 
clearPendingDeletion should be moved to after awaitDeleted.



##########
banyand/liaison/grpc/server.go:
##########
@@ -212,13 +215,19 @@ func NewServer(_ context.Context, tir1Client, tir2Client, 
broadcaster queue.Clie
        return s
 }
 
-func (s *server) PreRun(_ context.Context) error {
+func (s *server) PreRun(ctx context.Context) error {
        s.log = logger.GetLogger("liaison-grpc")
        s.streamSVC.setLogger(s.log.Named("stream-t1"))
        s.measureSVC.setLogger(s.log)
        s.traceSVC.setLogger(s.log.Named("trace"))
        s.propertyServer.SetLogger(s.log)
        s.bydbQLSVC.setLogger(s.log.Named("bydbql"))
+       s.groupRegistryServer.deletionTaskManager = newGroupDeletionTaskManager(
+               s.groupRegistryServer.schemaRegistry, s.propertyServer, 
s.groupRepo, s.log.Named("group-deletion"),
+       )
+       if initErr := 
s.groupRegistryServer.deletionTaskManager.initPropertyStorage(ctx); initErr != 
nil {
+               return initErr
+       }

Review Comment:
   Initialization timing issue: The deletionTaskManager is initialized in 
PreRun (line 225), but propertyServer is already in use at this point. If there 
are any ongoing deletion tasks from a previous server instance, they won't be 
resumed or cleaned up. Consider checking for existing deletion tasks during 
initialization and either resuming them or marking them as failed. 
Additionally, the groupRepo's inflight map is initialized in NewServer but the 
deletionTaskManager doesn't check if there are any pending deletions at startup.



##########
banyand/internal/storage/tsdb.go:
##########
@@ -175,6 +175,15 @@ func (d *database[T, O]) Close() error {
        return nil
 }
 
+// Drop closes the database and removes all data files from disk.
+func (d *database[T, O]) Drop() error {
+       if closeErr := d.Close(); closeErr != nil {
+               return closeErr
+       }
+       d.lfs.MustRMAll(d.location)
+       return nil

Review Comment:
   Missing error check: The MustRMAll method is called after Close returns, but 
if Close fails, the file system may be in an inconsistent state. Additionally, 
MustRMAll can panic on errors (suggested by the "Must" prefix), but this panic 
wouldn't be caught or handled gracefully. Consider checking if the directory 
still exists before calling MustRMAll, or handling potential panics from 
MustRMAll.
   ```suggestion
   func (d *database[T, O]) Drop() (err error) {
        if closeErr := d.Close(); closeErr != nil {
                return closeErr
        }
        func() {
                defer func() {
                        if r := recover(); r != nil {
                                err = errors.Errorf("panic while removing 
database directory %s: %v", d.location, r)
                        }
                }()
                d.lfs.MustRMAll(d.location)
        }()
        return err
   ```



##########
banyand/liaison/grpc/group_deletion.go:
##########
@@ -0,0 +1,383 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "fmt"
+       "sync"
+
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       internalDeletionTaskGroup        = "_deletion_task"
+       internalDeletionTaskPropertyName = "deletion_task"
+       taskDataTagName                  = "task_data"
+)
+
+type groupDeletionTaskManager struct {
+       schemaRegistry metadata.Repo
+       propServer     *propertyServer
+       log            *logger.Logger
+       groupRepo      *groupRepo
+       tasks          sync.Map
+}
+
+func newGroupDeletionTaskManager(schemaRegistry metadata.Repo, propServer 
*propertyServer, gr *groupRepo, l *logger.Logger) *groupDeletionTaskManager {
+       return &groupDeletionTaskManager{
+               schemaRegistry: schemaRegistry,
+               propServer:     propServer,
+               groupRepo:      gr,
+               log:            l,
+       }
+}
+
+func (m *groupDeletionTaskManager) initPropertyStorage(ctx context.Context) 
error {
+       group := &commonv1.Group{
+               Metadata: &commonv1.Metadata{
+                       Name: internalDeletionTaskGroup,
+               },
+               Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+               ResourceOpts: &commonv1.ResourceOpts{
+                       ShardNum: 1,
+               },
+       }
+       _, getGroupErr := m.schemaRegistry.GroupRegistry().GetGroup(ctx, 
internalDeletionTaskGroup)
+       if getGroupErr != nil {
+               if createErr := 
m.schemaRegistry.GroupRegistry().CreateGroup(ctx, group); createErr != nil {
+                       return fmt.Errorf("failed to create internal deletion 
task group: %w", createErr)
+               }
+       }
+       propSchema := &databasev1.Property{
+               Metadata: &commonv1.Metadata{
+                       Group: internalDeletionTaskGroup,
+                       Name:  internalDeletionTaskPropertyName,
+               },
+               Tags: []*databasev1.TagSpec{
+                       {
+                               Name: taskDataTagName,
+                               Type: databasev1.TagType_TAG_TYPE_DATA_BINARY,
+                       },
+               },
+       }
+       _, getPropErr := m.schemaRegistry.PropertyRegistry().GetProperty(ctx, 
propSchema.Metadata)
+       if getPropErr != nil {
+               if createErr := 
m.schemaRegistry.PropertyRegistry().CreateProperty(ctx, propSchema); createErr 
!= nil {
+                       return fmt.Errorf("failed to create internal deletion 
task property schema: %w", createErr)
+               }
+       }
+       return nil
+}
+
+func (m *groupDeletionTaskManager) startDeletion(ctx context.Context, group 
string) error {
+       if _, loaded := m.tasks.LoadOrStore(group, true); loaded {
+               return fmt.Errorf("deletion task for group %s is already in 
progress", group)
+       }
+       task := &databasev1.GroupDeletionTask{
+               CurrentPhase:  databasev1.GroupDeletionTask_PHASE_PENDING,
+               TotalCounts:   make(map[string]int32),
+               DeletedCounts: make(map[string]int32),
+               CreatedAt:     timestamppb.Now(),
+       }
+       dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+       if dataErr != nil {
+               m.tasks.Delete(group)
+               return fmt.Errorf("failed to collect data info for group %s: 
%w", group, dataErr)
+       }
+       var totalDataSize int64
+       for _, di := range dataInfo {
+               totalDataSize += di.GetDataSizeBytes()
+       }
+       task.TotalDataSizeBytes = totalDataSize
+       if saveErr := m.saveDeletionTask(ctx, group, task); saveErr != nil {
+               m.tasks.Delete(group)
+               return fmt.Errorf("failed to save initial deletion task for 
group %s: %w", group, saveErr)
+       }
+       go m.executeDeletion(ctx, group, task)
+       return nil
+}
+
+func (m *groupDeletionTaskManager) executeDeletion(ctx context.Context, group 
string, task *databasev1.GroupDeletionTask) {
+       defer m.tasks.Delete(group)
+       opt := schema.ListOpt{Group: group}
+
+       task.Message = "waiting for in-flight requests to complete"
+       m.saveProgress(ctx, group, task)
+       done := m.groupRepo.startPendingDeletion(group)
+       defer m.groupRepo.clearPendingDeletion(group)
+       <-done
+
+       task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_IN_PROGRESS
+
+       type deletionStep struct {
+               fn      func() error
+               message string
+       }
+       steps := []deletionStep{
+               {func() error { return m.deleteIndexRuleBindings(ctx, opt, 
task) }, "deleting index rule bindings"},
+               {func() error { return m.deleteIndexRules(ctx, opt, task) }, 
"deleting index rules"},
+               {func() error { return m.deleteProperties(ctx, opt, task) }, 
"deleting properties"},
+               {func() error { return m.deleteStreams(ctx, opt, task) }, 
"deleting streams"},
+               {func() error { return m.deleteMeasures(ctx, opt, task) }, 
"deleting measures"},
+               {func() error { return m.deleteTraces(ctx, opt, task) }, 
"deleting traces"},
+               {func() error { return m.deleteTopNAggregations(ctx, opt, task) 
}, "deleting topN aggregations"},
+       }
+       for _, step := range steps {
+               if stepErr := m.runStep(ctx, group, task, step.message, 
step.fn); stepErr != nil {
+                       return
+               }
+       }
+
+       task.Message = "deleting group and data files"
+       _, deleteGroupErr := m.schemaRegistry.GroupRegistry().DeleteGroup(ctx, 
group)
+       if deleteGroupErr != nil {
+               m.failTask(ctx, group, task, fmt.Sprintf("failed to delete 
group: %v", deleteGroupErr))
+               return
+       }
+       <-m.groupRepo.awaitDeleted(group)
+       task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_COMPLETED
+       task.Message = "group deleted successfully"
+       m.saveProgress(ctx, group, task)
+}
+
+func (m *groupDeletionTaskManager) deleteIndexRuleBindings(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       bindings, listErr := 
m.schemaRegistry.IndexRuleBindingRegistry().ListIndexRuleBinding(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["index_rule_binding"] = int32(len(bindings))
+       for _, binding := range bindings {
+               if _, deleteErr := 
m.schemaRegistry.IndexRuleBindingRegistry().DeleteIndexRuleBinding(ctx, 
binding.GetMetadata()); deleteErr != nil {
+                       return fmt.Errorf("index rule binding %s: %w", 
binding.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["index_rule_binding"] = 
task.TotalCounts["index_rule_binding"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteIndexRules(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       indexRules, listErr := 
m.schemaRegistry.IndexRuleRegistry().ListIndexRule(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["index_rule"] = int32(len(indexRules))
+       for _, rule := range indexRules {
+               if _, deleteErr := 
m.schemaRegistry.IndexRuleRegistry().DeleteIndexRule(ctx, rule.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("index rule %s: %w", 
rule.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["index_rule"] = task.TotalCounts["index_rule"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteProperties(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       properties, listErr := 
m.schemaRegistry.PropertyRegistry().ListProperty(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["property"] = int32(len(properties))
+       for _, prop := range properties {
+               if _, deleteErr := 
m.schemaRegistry.PropertyRegistry().DeleteProperty(ctx, prop.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("property %s: %w", 
prop.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["property"] = task.TotalCounts["property"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteStreams(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       streams, listErr := m.schemaRegistry.StreamRegistry().ListStream(ctx, 
opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["stream"] = int32(len(streams))
+       for _, stream := range streams {
+               if _, deleteErr := 
m.schemaRegistry.StreamRegistry().DeleteStream(ctx, stream.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("stream %s: %w", 
stream.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["stream"] = task.TotalCounts["stream"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteMeasures(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       measures, listErr := 
m.schemaRegistry.MeasureRegistry().ListMeasure(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["measure"] = int32(len(measures))
+       for _, measure := range measures {
+               if _, deleteErr := 
m.schemaRegistry.MeasureRegistry().DeleteMeasure(ctx, measure.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("measure %s: %w", 
measure.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["measure"] = task.TotalCounts["measure"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteTraces(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       traces, listErr := m.schemaRegistry.TraceRegistry().ListTrace(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["trace"] = int32(len(traces))
+       for _, trace := range traces {
+               if _, deleteErr := 
m.schemaRegistry.TraceRegistry().DeleteTrace(ctx, trace.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("trace %s: %w", 
trace.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["trace"] = task.TotalCounts["trace"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteTopNAggregations(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       topNAggs, listErr := 
m.schemaRegistry.TopNAggregationRegistry().ListTopNAggregation(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["topn_aggregation"] = int32(len(topNAggs))
+       for _, agg := range topNAggs {
+               if _, deleteErr := 
m.schemaRegistry.TopNAggregationRegistry().DeleteTopNAggregation(ctx, 
agg.GetMetadata()); deleteErr != nil {
+                       return fmt.Errorf("topN aggregation %s: %w", 
agg.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["topn_aggregation"] = 
task.TotalCounts["topn_aggregation"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) runStep(
+       ctx context.Context, group string, task *databasev1.GroupDeletionTask,
+       message string, fn func() error,
+) error {
+       task.Message = message
+       if stepErr := fn(); stepErr != nil {
+               m.failTask(ctx, group, task, fmt.Sprintf("%s failed: %v", 
message, stepErr))
+               return stepErr
+       }
+       m.saveProgress(ctx, group, task)
+       return nil
+}
+
+func (m *groupDeletionTaskManager) saveProgress(ctx context.Context, group 
string, task *databasev1.GroupDeletionTask) {
+       snapshot := proto.Clone(task).(*databasev1.GroupDeletionTask)
+       if saveErr := m.saveDeletionTask(ctx, group, snapshot); saveErr != nil {
+               m.log.Error().Err(saveErr).Str("group", group).Msg("failed to 
save deletion progress")
+       }
+}
+
+func (m *groupDeletionTaskManager) failTask(
+       ctx context.Context, group string, task *databasev1.GroupDeletionTask, 
msg string,
+) {
+       task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_FAILED
+       task.Message = msg
+       m.log.Error().Str("group", group).Msg(msg)
+       m.saveProgress(ctx, group, task)
+}
+
+func (m *groupDeletionTaskManager) saveDeletionTask(ctx context.Context, group 
string, task *databasev1.GroupDeletionTask) error {
+       taskData, marshalErr := proto.Marshal(task)
+       if marshalErr != nil {
+               return fmt.Errorf("failed to marshal deletion task: %w", 
marshalErr)
+       }
+       _, applyErr := m.propServer.Apply(ctx, &propertyv1.ApplyRequest{
+               Property: &propertyv1.Property{
+                       Metadata: &commonv1.Metadata{
+                               Group: internalDeletionTaskGroup,
+                               Name:  internalDeletionTaskPropertyName,
+                       },
+                       Id: group,
+                       Tags: []*modelv1.Tag{
+                               {
+                                       Key:   taskDataTagName,
+                                       Value: &modelv1.TagValue{Value: 
&modelv1.TagValue_BinaryData{BinaryData: taskData}},
+                               },
+                       },
+               },
+               Strategy: propertyv1.ApplyRequest_STRATEGY_REPLACE,
+       })

Review Comment:
   Potential deadlock: saveDeletionTask calls propServer.Apply which now 
includes acquireRequest checks. When saving deletion task state to the internal 
"_deletion_task" group, this could fail if that group is pending deletion or 
cause blocking. The internal deletion task group should be excluded from the 
acquireRequest/releaseRequest mechanism since it's used to track deletion state 
itself.



##########
banyand/liaison/grpc/group_deletion.go:
##########
@@ -0,0 +1,383 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "fmt"
+       "sync"
+
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       internalDeletionTaskGroup        = "_deletion_task"
+       internalDeletionTaskPropertyName = "deletion_task"
+       taskDataTagName                  = "task_data"
+)
+
+type groupDeletionTaskManager struct {
+       schemaRegistry metadata.Repo
+       propServer     *propertyServer
+       log            *logger.Logger
+       groupRepo      *groupRepo
+       tasks          sync.Map
+}
+
+func newGroupDeletionTaskManager(schemaRegistry metadata.Repo, propServer 
*propertyServer, gr *groupRepo, l *logger.Logger) *groupDeletionTaskManager {
+       return &groupDeletionTaskManager{
+               schemaRegistry: schemaRegistry,
+               propServer:     propServer,
+               groupRepo:      gr,
+               log:            l,
+       }
+}
+
+func (m *groupDeletionTaskManager) initPropertyStorage(ctx context.Context) 
error {
+       group := &commonv1.Group{
+               Metadata: &commonv1.Metadata{
+                       Name: internalDeletionTaskGroup,
+               },
+               Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+               ResourceOpts: &commonv1.ResourceOpts{
+                       ShardNum: 1,
+               },
+       }
+       _, getGroupErr := m.schemaRegistry.GroupRegistry().GetGroup(ctx, 
internalDeletionTaskGroup)
+       if getGroupErr != nil {
+               if createErr := 
m.schemaRegistry.GroupRegistry().CreateGroup(ctx, group); createErr != nil {
+                       return fmt.Errorf("failed to create internal deletion 
task group: %w", createErr)
+               }
+       }
+       propSchema := &databasev1.Property{
+               Metadata: &commonv1.Metadata{
+                       Group: internalDeletionTaskGroup,
+                       Name:  internalDeletionTaskPropertyName,
+               },
+               Tags: []*databasev1.TagSpec{
+                       {
+                               Name: taskDataTagName,
+                               Type: databasev1.TagType_TAG_TYPE_DATA_BINARY,
+                       },
+               },
+       }
+       _, getPropErr := m.schemaRegistry.PropertyRegistry().GetProperty(ctx, 
propSchema.Metadata)
+       if getPropErr != nil {
+               if createErr := 
m.schemaRegistry.PropertyRegistry().CreateProperty(ctx, propSchema); createErr 
!= nil {
+                       return fmt.Errorf("failed to create internal deletion 
task property schema: %w", createErr)
+               }
+       }
+       return nil
+}
+
+func (m *groupDeletionTaskManager) startDeletion(ctx context.Context, group 
string) error {
+       if _, loaded := m.tasks.LoadOrStore(group, true); loaded {
+               return fmt.Errorf("deletion task for group %s is already in 
progress", group)
+       }
+       task := &databasev1.GroupDeletionTask{
+               CurrentPhase:  databasev1.GroupDeletionTask_PHASE_PENDING,
+               TotalCounts:   make(map[string]int32),
+               DeletedCounts: make(map[string]int32),
+               CreatedAt:     timestamppb.Now(),
+       }
+       dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+       if dataErr != nil {
+               m.tasks.Delete(group)
+               return fmt.Errorf("failed to collect data info for group %s: 
%w", group, dataErr)
+       }
+       var totalDataSize int64
+       for _, di := range dataInfo {
+               totalDataSize += di.GetDataSizeBytes()
+       }
+       task.TotalDataSizeBytes = totalDataSize
+       if saveErr := m.saveDeletionTask(ctx, group, task); saveErr != nil {
+               m.tasks.Delete(group)
+               return fmt.Errorf("failed to save initial deletion task for 
group %s: %w", group, saveErr)
+       }
+       go m.executeDeletion(ctx, group, task)
+       return nil
+}
+
+func (m *groupDeletionTaskManager) executeDeletion(ctx context.Context, group 
string, task *databasev1.GroupDeletionTask) {
+       defer m.tasks.Delete(group)
+       opt := schema.ListOpt{Group: group}
+
+       task.Message = "waiting for in-flight requests to complete"
+       m.saveProgress(ctx, group, task)
+       done := m.groupRepo.startPendingDeletion(group)
+       defer m.groupRepo.clearPendingDeletion(group)
+       <-done
+
+       task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_IN_PROGRESS
+
+       type deletionStep struct {
+               fn      func() error
+               message string
+       }
+       steps := []deletionStep{
+               {func() error { return m.deleteIndexRuleBindings(ctx, opt, 
task) }, "deleting index rule bindings"},
+               {func() error { return m.deleteIndexRules(ctx, opt, task) }, 
"deleting index rules"},
+               {func() error { return m.deleteProperties(ctx, opt, task) }, 
"deleting properties"},
+               {func() error { return m.deleteStreams(ctx, opt, task) }, 
"deleting streams"},
+               {func() error { return m.deleteMeasures(ctx, opt, task) }, 
"deleting measures"},
+               {func() error { return m.deleteTraces(ctx, opt, task) }, 
"deleting traces"},
+               {func() error { return m.deleteTopNAggregations(ctx, opt, task) 
}, "deleting topN aggregations"},
+       }
+       for _, step := range steps {
+               if stepErr := m.runStep(ctx, group, task, step.message, 
step.fn); stepErr != nil {
+                       return
+               }
+       }
+
+       task.Message = "deleting group and data files"
+       _, deleteGroupErr := m.schemaRegistry.GroupRegistry().DeleteGroup(ctx, 
group)
+       if deleteGroupErr != nil {
+               m.failTask(ctx, group, task, fmt.Sprintf("failed to delete 
group: %v", deleteGroupErr))
+               return
+       }
+       <-m.groupRepo.awaitDeleted(group)
+       task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_COMPLETED
+       task.Message = "group deleted successfully"
+       m.saveProgress(ctx, group, task)
+}
+
+func (m *groupDeletionTaskManager) deleteIndexRuleBindings(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       bindings, listErr := 
m.schemaRegistry.IndexRuleBindingRegistry().ListIndexRuleBinding(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["index_rule_binding"] = int32(len(bindings))
+       for _, binding := range bindings {
+               if _, deleteErr := 
m.schemaRegistry.IndexRuleBindingRegistry().DeleteIndexRuleBinding(ctx, 
binding.GetMetadata()); deleteErr != nil {
+                       return fmt.Errorf("index rule binding %s: %w", 
binding.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["index_rule_binding"] = 
task.TotalCounts["index_rule_binding"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteIndexRules(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       indexRules, listErr := 
m.schemaRegistry.IndexRuleRegistry().ListIndexRule(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["index_rule"] = int32(len(indexRules))
+       for _, rule := range indexRules {
+               if _, deleteErr := 
m.schemaRegistry.IndexRuleRegistry().DeleteIndexRule(ctx, rule.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("index rule %s: %w", 
rule.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["index_rule"] = task.TotalCounts["index_rule"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteProperties(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       properties, listErr := 
m.schemaRegistry.PropertyRegistry().ListProperty(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["property"] = int32(len(properties))
+       for _, prop := range properties {
+               if _, deleteErr := 
m.schemaRegistry.PropertyRegistry().DeleteProperty(ctx, prop.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("property %s: %w", 
prop.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["property"] = task.TotalCounts["property"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteStreams(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       streams, listErr := m.schemaRegistry.StreamRegistry().ListStream(ctx, 
opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["stream"] = int32(len(streams))
+       for _, stream := range streams {
+               if _, deleteErr := 
m.schemaRegistry.StreamRegistry().DeleteStream(ctx, stream.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("stream %s: %w", 
stream.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["stream"] = task.TotalCounts["stream"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteMeasures(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       measures, listErr := 
m.schemaRegistry.MeasureRegistry().ListMeasure(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["measure"] = int32(len(measures))
+       for _, measure := range measures {
+               if _, deleteErr := 
m.schemaRegistry.MeasureRegistry().DeleteMeasure(ctx, measure.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("measure %s: %w", 
measure.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["measure"] = task.TotalCounts["measure"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteTraces(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       traces, listErr := m.schemaRegistry.TraceRegistry().ListTrace(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["trace"] = int32(len(traces))
+       for _, trace := range traces {
+               if _, deleteErr := 
m.schemaRegistry.TraceRegistry().DeleteTrace(ctx, trace.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("trace %s: %w", 
trace.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["trace"] = task.TotalCounts["trace"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteTopNAggregations(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       topNAggs, listErr := 
m.schemaRegistry.TopNAggregationRegistry().ListTopNAggregation(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["topn_aggregation"] = int32(len(topNAggs))
+       for _, agg := range topNAggs {
+               if _, deleteErr := 
m.schemaRegistry.TopNAggregationRegistry().DeleteTopNAggregation(ctx, 
agg.GetMetadata()); deleteErr != nil {
+                       return fmt.Errorf("topN aggregation %s: %w", 
agg.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["topn_aggregation"] = 
task.TotalCounts["topn_aggregation"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) runStep(
+       ctx context.Context, group string, task *databasev1.GroupDeletionTask,
+       message string, fn func() error,
+) error {
+       task.Message = message
+       if stepErr := fn(); stepErr != nil {
+               m.failTask(ctx, group, task, fmt.Sprintf("%s failed: %v", 
message, stepErr))
+               return stepErr
+       }
+       m.saveProgress(ctx, group, task)
+       return nil
+}
+
+func (m *groupDeletionTaskManager) saveProgress(ctx context.Context, group 
string, task *databasev1.GroupDeletionTask) {
+       snapshot := proto.Clone(task).(*databasev1.GroupDeletionTask)
+       if saveErr := m.saveDeletionTask(ctx, group, snapshot); saveErr != nil {
+               m.log.Error().Err(saveErr).Str("group", group).Msg("failed to 
save deletion progress")
+       }
+}
+
+func (m *groupDeletionTaskManager) failTask(
+       ctx context.Context, group string, task *databasev1.GroupDeletionTask, 
msg string,
+) {
+       task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_FAILED
+       task.Message = msg
+       m.log.Error().Str("group", group).Msg(msg)
+       m.saveProgress(ctx, group, task)
+}
+
+func (m *groupDeletionTaskManager) saveDeletionTask(ctx context.Context, group 
string, task *databasev1.GroupDeletionTask) error {
+       taskData, marshalErr := proto.Marshal(task)
+       if marshalErr != nil {
+               return fmt.Errorf("failed to marshal deletion task: %w", 
marshalErr)
+       }
+       _, applyErr := m.propServer.Apply(ctx, &propertyv1.ApplyRequest{
+               Property: &propertyv1.Property{
+                       Metadata: &commonv1.Metadata{
+                               Group: internalDeletionTaskGroup,
+                               Name:  internalDeletionTaskPropertyName,
+                       },
+                       Id: group,
+                       Tags: []*modelv1.Tag{
+                               {
+                                       Key:   taskDataTagName,
+                                       Value: &modelv1.TagValue{Value: 
&modelv1.TagValue_BinaryData{BinaryData: taskData}},
+                               },
+                       },
+               },
+               Strategy: propertyv1.ApplyRequest_STRATEGY_REPLACE,
+       })
+       if applyErr != nil {
+               return fmt.Errorf("failed to save deletion task property: %w", 
applyErr)
+       }
+       return nil
+}
+
+func (m *groupDeletionTaskManager) getDeletionTask(ctx context.Context, group 
string) (*databasev1.GroupDeletionTask, error) {
+       resp, queryErr := m.propServer.Query(ctx, &propertyv1.QueryRequest{
+               Groups: []string{internalDeletionTaskGroup},
+               Name:   internalDeletionTaskPropertyName,
+               Ids:    []string{group},
+               Limit:  1,
+       })
+       if queryErr != nil {
+               return nil, fmt.Errorf("failed to query deletion task property: 
%w", queryErr)
+       }
+       if len(resp.Properties) == 0 {
+               return nil, fmt.Errorf("deletion task for group %s not found", 
group)
+       }
+       for _, tag := range resp.Properties[0].Tags {
+               if tag.Key == taskDataTagName {
+                       binaryData := tag.Value.GetBinaryData()
+                       if binaryData == nil {
+                               return nil, fmt.Errorf("deletion task for group 
%s has no binary data", group)
+                       }
+                       var task databasev1.GroupDeletionTask
+                       if unmarshalErr := proto.Unmarshal(binaryData, &task); 
unmarshalErr != nil {
+                               return nil, fmt.Errorf("failed to unmarshal 
deletion task: %w", unmarshalErr)
+                       }
+                       return &task, nil
+               }
+       }
+       return nil, fmt.Errorf("deletion task for group %s has no task_data 
tag", group)
+}
+
+func (m *groupDeletionTaskManager) hasNonEmptyResources(ctx context.Context, 
group string) (bool, error) {
+       dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+       if dataErr != nil {
+               return false, fmt.Errorf("failed to collect data info: %w", 
dataErr)
+       }
+       for _, di := range dataInfo {
+               if di.GetDataSizeBytes() > 0 {
+                       return true, nil
+               }
+       }
+       return false, nil
+}

Review Comment:
   Missing test coverage: This new file implements critical deletion 
functionality including concurrent request tracking, background task execution, 
and data cleanup. However, no tests have been added to verify these complex 
behaviors. Consider adding tests for: task state management, concurrent request 
blocking during deletion, context cancellation handling, error recovery 
scenarios, and the complete deletion workflow.



##########
banyand/liaison/grpc/group_deletion.go:
##########
@@ -0,0 +1,383 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "fmt"
+       "sync"
+
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       internalDeletionTaskGroup        = "_deletion_task"
+       internalDeletionTaskPropertyName = "deletion_task"
+       taskDataTagName                  = "task_data"
+)
+
+type groupDeletionTaskManager struct {
+       schemaRegistry metadata.Repo
+       propServer     *propertyServer
+       log            *logger.Logger
+       groupRepo      *groupRepo
+       tasks          sync.Map
+}
+
+func newGroupDeletionTaskManager(schemaRegistry metadata.Repo, propServer 
*propertyServer, gr *groupRepo, l *logger.Logger) *groupDeletionTaskManager {
+       return &groupDeletionTaskManager{
+               schemaRegistry: schemaRegistry,
+               propServer:     propServer,
+               groupRepo:      gr,
+               log:            l,
+       }
+}
+
+func (m *groupDeletionTaskManager) initPropertyStorage(ctx context.Context) 
error {
+       group := &commonv1.Group{
+               Metadata: &commonv1.Metadata{
+                       Name: internalDeletionTaskGroup,
+               },
+               Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+               ResourceOpts: &commonv1.ResourceOpts{
+                       ShardNum: 1,
+               },
+       }
+       _, getGroupErr := m.schemaRegistry.GroupRegistry().GetGroup(ctx, 
internalDeletionTaskGroup)
+       if getGroupErr != nil {
+               if createErr := 
m.schemaRegistry.GroupRegistry().CreateGroup(ctx, group); createErr != nil {
+                       return fmt.Errorf("failed to create internal deletion 
task group: %w", createErr)
+               }
+       }
+       propSchema := &databasev1.Property{
+               Metadata: &commonv1.Metadata{
+                       Group: internalDeletionTaskGroup,
+                       Name:  internalDeletionTaskPropertyName,
+               },
+               Tags: []*databasev1.TagSpec{
+                       {
+                               Name: taskDataTagName,
+                               Type: databasev1.TagType_TAG_TYPE_DATA_BINARY,
+                       },
+               },
+       }
+       _, getPropErr := m.schemaRegistry.PropertyRegistry().GetProperty(ctx, 
propSchema.Metadata)
+       if getPropErr != nil {
+               if createErr := 
m.schemaRegistry.PropertyRegistry().CreateProperty(ctx, propSchema); createErr 
!= nil {
+                       return fmt.Errorf("failed to create internal deletion 
task property schema: %w", createErr)
+               }
+       }
+       return nil
+}
+
+func (m *groupDeletionTaskManager) startDeletion(ctx context.Context, group 
string) error {
+       if _, loaded := m.tasks.LoadOrStore(group, true); loaded {
+               return fmt.Errorf("deletion task for group %s is already in 
progress", group)
+       }
+       task := &databasev1.GroupDeletionTask{
+               CurrentPhase:  databasev1.GroupDeletionTask_PHASE_PENDING,
+               TotalCounts:   make(map[string]int32),
+               DeletedCounts: make(map[string]int32),
+               CreatedAt:     timestamppb.Now(),
+       }
+       dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+       if dataErr != nil {
+               m.tasks.Delete(group)
+               return fmt.Errorf("failed to collect data info for group %s: 
%w", group, dataErr)
+       }
+       var totalDataSize int64
+       for _, di := range dataInfo {
+               totalDataSize += di.GetDataSizeBytes()
+       }
+       task.TotalDataSizeBytes = totalDataSize
+       if saveErr := m.saveDeletionTask(ctx, group, task); saveErr != nil {
+               m.tasks.Delete(group)
+               return fmt.Errorf("failed to save initial deletion task for 
group %s: %w", group, saveErr)
+       }
+       go m.executeDeletion(ctx, group, task)
+       return nil
+}
+
+func (m *groupDeletionTaskManager) executeDeletion(ctx context.Context, group 
string, task *databasev1.GroupDeletionTask) {
+       defer m.tasks.Delete(group)
+       opt := schema.ListOpt{Group: group}
+
+       task.Message = "waiting for in-flight requests to complete"
+       m.saveProgress(ctx, group, task)
+       done := m.groupRepo.startPendingDeletion(group)
+       defer m.groupRepo.clearPendingDeletion(group)
+       <-done
+
+       task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_IN_PROGRESS
+
+       type deletionStep struct {
+               fn      func() error
+               message string
+       }
+       steps := []deletionStep{
+               {func() error { return m.deleteIndexRuleBindings(ctx, opt, 
task) }, "deleting index rule bindings"},
+               {func() error { return m.deleteIndexRules(ctx, opt, task) }, 
"deleting index rules"},
+               {func() error { return m.deleteProperties(ctx, opt, task) }, 
"deleting properties"},
+               {func() error { return m.deleteStreams(ctx, opt, task) }, 
"deleting streams"},
+               {func() error { return m.deleteMeasures(ctx, opt, task) }, 
"deleting measures"},
+               {func() error { return m.deleteTraces(ctx, opt, task) }, 
"deleting traces"},
+               {func() error { return m.deleteTopNAggregations(ctx, opt, task) 
}, "deleting topN aggregations"},
+       }
+       for _, step := range steps {
+               if stepErr := m.runStep(ctx, group, task, step.message, 
step.fn); stepErr != nil {
+                       return
+               }
+       }
+
+       task.Message = "deleting group and data files"
+       _, deleteGroupErr := m.schemaRegistry.GroupRegistry().DeleteGroup(ctx, 
group)
+       if deleteGroupErr != nil {
+               m.failTask(ctx, group, task, fmt.Sprintf("failed to delete 
group: %v", deleteGroupErr))
+               return
+       }
+       <-m.groupRepo.awaitDeleted(group)
+       task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_COMPLETED
+       task.Message = "group deleted successfully"
+       m.saveProgress(ctx, group, task)
+}
+
+func (m *groupDeletionTaskManager) deleteIndexRuleBindings(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       bindings, listErr := 
m.schemaRegistry.IndexRuleBindingRegistry().ListIndexRuleBinding(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["index_rule_binding"] = int32(len(bindings))
+       for _, binding := range bindings {
+               if _, deleteErr := 
m.schemaRegistry.IndexRuleBindingRegistry().DeleteIndexRuleBinding(ctx, 
binding.GetMetadata()); deleteErr != nil {
+                       return fmt.Errorf("index rule binding %s: %w", 
binding.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["index_rule_binding"] = 
task.TotalCounts["index_rule_binding"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteIndexRules(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       indexRules, listErr := 
m.schemaRegistry.IndexRuleRegistry().ListIndexRule(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["index_rule"] = int32(len(indexRules))
+       for _, rule := range indexRules {
+               if _, deleteErr := 
m.schemaRegistry.IndexRuleRegistry().DeleteIndexRule(ctx, rule.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("index rule %s: %w", 
rule.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["index_rule"] = task.TotalCounts["index_rule"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteProperties(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       properties, listErr := 
m.schemaRegistry.PropertyRegistry().ListProperty(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["property"] = int32(len(properties))
+       for _, prop := range properties {
+               if _, deleteErr := 
m.schemaRegistry.PropertyRegistry().DeleteProperty(ctx, prop.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("property %s: %w", 
prop.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["property"] = task.TotalCounts["property"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteStreams(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       streams, listErr := m.schemaRegistry.StreamRegistry().ListStream(ctx, 
opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["stream"] = int32(len(streams))
+       for _, stream := range streams {
+               if _, deleteErr := 
m.schemaRegistry.StreamRegistry().DeleteStream(ctx, stream.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("stream %s: %w", 
stream.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["stream"] = task.TotalCounts["stream"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteMeasures(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       measures, listErr := 
m.schemaRegistry.MeasureRegistry().ListMeasure(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["measure"] = int32(len(measures))
+       for _, measure := range measures {
+               if _, deleteErr := 
m.schemaRegistry.MeasureRegistry().DeleteMeasure(ctx, measure.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("measure %s: %w", 
measure.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["measure"] = task.TotalCounts["measure"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteTraces(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       traces, listErr := m.schemaRegistry.TraceRegistry().ListTrace(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["trace"] = int32(len(traces))
+       for _, trace := range traces {
+               if _, deleteErr := 
m.schemaRegistry.TraceRegistry().DeleteTrace(ctx, trace.GetMetadata()); 
deleteErr != nil {
+                       return fmt.Errorf("trace %s: %w", 
trace.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["trace"] = task.TotalCounts["trace"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) deleteTopNAggregations(
+       ctx context.Context, opt schema.ListOpt, task 
*databasev1.GroupDeletionTask,
+) error {
+       topNAggs, listErr := 
m.schemaRegistry.TopNAggregationRegistry().ListTopNAggregation(ctx, opt)
+       if listErr != nil {
+               return listErr
+       }
+       task.TotalCounts["topn_aggregation"] = int32(len(topNAggs))
+       for _, agg := range topNAggs {
+               if _, deleteErr := 
m.schemaRegistry.TopNAggregationRegistry().DeleteTopNAggregation(ctx, 
agg.GetMetadata()); deleteErr != nil {
+                       return fmt.Errorf("topN aggregation %s: %w", 
agg.GetMetadata().GetName(), deleteErr)
+               }
+       }
+       task.DeletedCounts["topn_aggregation"] = 
task.TotalCounts["topn_aggregation"]
+       return nil
+}
+
+func (m *groupDeletionTaskManager) runStep(
+       ctx context.Context, group string, task *databasev1.GroupDeletionTask,
+       message string, fn func() error,
+) error {
+       task.Message = message
+       if stepErr := fn(); stepErr != nil {
+               m.failTask(ctx, group, task, fmt.Sprintf("%s failed: %v", 
message, stepErr))
+               return stepErr
+       }
+       m.saveProgress(ctx, group, task)
+       return nil
+}
+
+func (m *groupDeletionTaskManager) saveProgress(ctx context.Context, group 
string, task *databasev1.GroupDeletionTask) {
+       snapshot := proto.Clone(task).(*databasev1.GroupDeletionTask)
+       if saveErr := m.saveDeletionTask(ctx, group, snapshot); saveErr != nil {
+               m.log.Error().Err(saveErr).Str("group", group).Msg("failed to 
save deletion progress")
+       }
+}
+
+func (m *groupDeletionTaskManager) failTask(
+       ctx context.Context, group string, task *databasev1.GroupDeletionTask, 
msg string,
+) {
+       task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_FAILED
+       task.Message = msg
+       m.log.Error().Str("group", group).Msg(msg)
+       m.saveProgress(ctx, group, task)
+}
+
+func (m *groupDeletionTaskManager) saveDeletionTask(ctx context.Context, group 
string, task *databasev1.GroupDeletionTask) error {
+       taskData, marshalErr := proto.Marshal(task)
+       if marshalErr != nil {
+               return fmt.Errorf("failed to marshal deletion task: %w", 
marshalErr)
+       }
+       _, applyErr := m.propServer.Apply(ctx, &propertyv1.ApplyRequest{
+               Property: &propertyv1.Property{
+                       Metadata: &commonv1.Metadata{
+                               Group: internalDeletionTaskGroup,
+                               Name:  internalDeletionTaskPropertyName,
+                       },
+                       Id: group,
+                       Tags: []*modelv1.Tag{
+                               {
+                                       Key:   taskDataTagName,
+                                       Value: &modelv1.TagValue{Value: 
&modelv1.TagValue_BinaryData{BinaryData: taskData}},
+                               },
+                       },
+               },
+               Strategy: propertyv1.ApplyRequest_STRATEGY_REPLACE,
+       })
+       if applyErr != nil {
+               return fmt.Errorf("failed to save deletion task property: %w", 
applyErr)
+       }
+       return nil
+}
+
+func (m *groupDeletionTaskManager) getDeletionTask(ctx context.Context, group 
string) (*databasev1.GroupDeletionTask, error) {
+       resp, queryErr := m.propServer.Query(ctx, &propertyv1.QueryRequest{
+               Groups: []string{internalDeletionTaskGroup},
+               Name:   internalDeletionTaskPropertyName,
+               Ids:    []string{group},
+               Limit:  1,
+       })

Review Comment:
   Potential issue: getDeletionTask calls propServer.Query which now includes 
acquireRequest checks. When querying the internal "_deletion_task" group to 
retrieve deletion task state, this could fail if that group is pending 
deletion. The internal deletion task group should be excluded from the 
acquireRequest/releaseRequest mechanism since it's used to track deletion state 
itself.



##########
api/proto/banyandb/database/v1/rpc.proto:
##########
@@ -358,21 +358,13 @@ message GroupRegistryServiceUpdateResponse {}
 message GroupRegistryServiceDeleteRequest {
   // group is the name of the group to delete.
   string group = 1;
-  // dry_run indicates whether to perform a dry run without actually deleting 
data.
-  // When true, returns what would be deleted without making changes.
-  bool dry_run = 2;
   // force indicates whether to force delete the group even if it contains 
data.
   // When false, deletion will fail if the group is not empty.
-  bool force = 3;
+  bool force = 2;

Review Comment:
   Reusing the protobuf field number `2` for `force` after removing the 
`dry_run` field can cause older clients compiled against the previous schema to 
be misinterpreted by newer servers. Any legacy client that sends `dry_run=true, 
force=false` (field 2 set, field 3 unset) will now be seen by the updated 
server as `force=true`, potentially turning what callers believe is a 
non-destructive “dry run” into a forced deletion of non-empty groups. To avoid 
this, keep `force` on its original field number and either reserve field `2` or 
keep the `dry_run` field (even if unused) so that mixed-version deployments 
cannot accidentally escalate a dry-run call into a destructive operation.



##########
banyand/liaison/grpc/registry.go:
##########
@@ -618,21 +619,34 @@ func (rs *groupRegistryServer) Update(ctx 
context.Context, req *databasev1.Group
 func (rs *groupRegistryServer) Delete(ctx context.Context, req 
*databasev1.GroupRegistryServiceDeleteRequest) (
        *databasev1.GroupRegistryServiceDeleteResponse, error,
 ) {
-       g := ""
+       g := req.GetGroup()
        rs.metrics.totalRegistryStarted.Inc(1, g, "group", "delete")
        start := time.Now()
        defer func() {
                rs.metrics.totalRegistryFinished.Inc(1, g, "group", "delete")
                
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", 
"delete")
        }()
-       deleted, err := rs.schemaRegistry.GroupRegistry().DeleteGroup(ctx, 
req.GetGroup())
-       if err != nil {
+       if _, getErr := rs.schemaRegistry.GroupRegistry().GetGroup(ctx, g); 
getErr != nil {
                rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
-               return nil, err
+               return nil, getErr
+       }
+       if !req.GetForce() {
+               hasResources, checkErr := 
rs.deletionTaskManager.hasNonEmptyResources(ctx, g)
+               if hasResources {
+                       rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
+                       return nil, status.Errorf(codes.FailedPrecondition,
+                               "group %s is not empty, use force=true to 
delete non-empty groups", g)
+               }
+               if checkErr != nil {
+                       rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
+                       return nil, checkErr
+               }

Review Comment:
   Logic issue: The order of checks is incorrect. The code checks if 
hasResources is true first (line 635), then checks if checkErr is not nil (line 
640). This means if checkErr is not nil AND hasResources is true, the function 
will return the FailedPrecondition error instead of the checkErr. The error 
check should be performed before using the hasResources result to ensure we 
don't return misleading errors when the check itself failed.
   ```suggestion
                if checkErr != nil {
                        rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
                        return nil, checkErr
                }
                if hasResources {
                        rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
                        return nil, status.Errorf(codes.FailedPrecondition,
                                "group %s is not empty, use force=true to 
delete non-empty groups", g)
                }
   ```



##########
banyand/liaison/grpc/discovery.go:
##########
@@ -138,13 +141,79 @@ func (i identity) String() string {
 
 var _ schema.EventHandler = (*groupRepo)(nil)
 
+type groupInflight struct {
+       done    chan struct{}
+       deleted chan struct{}
+       wg      sync.WaitGroup
+}
+
 type groupRepo struct {
        schema.UnimplementedOnInitHandler
        log          *logger.Logger
        resourceOpts map[string]*commonv1.ResourceOpts
+       inflight     map[string]*groupInflight
        sync.RWMutex
 }
 
+func (s *groupRepo) acquireRequest(groupName string) error {
+       s.RWMutex.Lock()
+       defer s.RWMutex.Unlock()
+       item, ok := s.inflight[groupName]
+       if ok && item.done != nil {
+               return fmt.Errorf("%s: %w", groupName, errGroupPendingDeletion)
+       }
+       if !ok {
+               item = &groupInflight{}
+               s.inflight[groupName] = item
+       }
+       item.wg.Add(1)
+       return nil
+}
+
+func (s *groupRepo) releaseRequest(groupName string) {
+       s.RWMutex.RLock()
+       item, ok := s.inflight[groupName]
+       s.RWMutex.RUnlock()
+       if ok {
+               item.wg.Done()
+       }
+}
+
+func (s *groupRepo) startPendingDeletion(groupName string) <-chan struct{} {
+       s.RWMutex.Lock()
+       item, ok := s.inflight[groupName]
+       if !ok {
+               item = &groupInflight{}
+               s.inflight[groupName] = item
+       }
+       item.done = make(chan struct{})
+       item.deleted = make(chan struct{})
+       s.RWMutex.Unlock()
+       go func() {
+               item.wg.Wait()
+               close(item.done)
+       }()
+       return item.done
+}
+
+func (s *groupRepo) awaitDeleted(groupName string) <-chan struct{} {
+       s.RWMutex.RLock()
+       item, ok := s.inflight[groupName]
+       s.RWMutex.RUnlock()
+       if ok && item.deleted != nil {
+               return item.deleted
+       }
+       ch := make(chan struct{})
+       close(ch)
+       return ch
+}
+
+func (s *groupRepo) clearPendingDeletion(groupName string) {
+       s.RWMutex.Lock()
+       defer s.RWMutex.Unlock()
+       delete(s.inflight, groupName)
+}

Review Comment:
   Missing test coverage: The inflight tracking mechanism (acquireRequest, 
releaseRequest, startPendingDeletion) is critical for preventing data 
corruption during group deletion but has no test coverage. Consider adding 
tests for: concurrent acquireRequest calls, race conditions between acquire and 
startPendingDeletion, proper wait group handling, and channel lifecycle 
management.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to