Copilot commented on code in PR #978:
URL:
https://github.com/apache/skywalking-banyandb/pull/978#discussion_r2853582646
##########
banyand/property/service.go:
##########
@@ -204,6 +215,33 @@ func (s *service) PreRun(ctx context.Context) error {
)
}
+type propertyGroupEventHandler struct {
+ svc *service
+}
+
+func (h *propertyGroupEventHandler) OnInit([]schema.Kind) (bool, []int64) {
+ return false, nil
+}
+
+func (h *propertyGroupEventHandler) OnAddOrUpdate(_ schema.Metadata) {}
+
+func (h *propertyGroupEventHandler) OnDelete(md schema.Metadata) {
+ if md.Kind != schema.KindGroup {
+ return
+ }
+ group := md.Spec.(*commonv1.Group)
+ if group.Catalog != commonv1.Catalog_CATALOG_PROPERTY {
+ return
+ }
+ groupName := group.Metadata.GetName()
+ if dropErr := h.svc.db.Drop(groupName); dropErr != nil {
+ h.svc.l.Error().Err(dropErr).Str("group",
groupName).Msg("failed to drop group")
+ }
+ if ch, loaded := h.svc.pendingGroupDrops.LoadAndDelete(groupName);
loaded {
+ close(ch.(chan struct{}))
+ }
Review Comment:
The drop-notification channel is closed even when `db.Drop(groupName)`
fails, which can signal "drop completed" to waiters while data is still
present. Consider only closing the channel on successful drop, and/or recording
the drop error so the deletion task can fail instead of completing.
##########
banyand/liaison/grpc/measure.go:
##########
@@ -358,6 +366,16 @@ func (ms *measureService) handleWriteCleanup(publisher
queue.BatchPublisher, suc
var emptyMeasureQueryResponse = &measurev1.QueryResponse{DataPoints:
make([]*measurev1.DataPoint, 0)}
func (ms *measureService) Query(ctx context.Context, req
*measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) {
+ for _, g := range req.Groups {
+ if acquireErr := ms.groupRepo.acquireRequest(g); acquireErr !=
nil {
+ return nil, status.Errorf(codes.FailedPrecondition,
"group %s is pending deletion", g)
+ }
+ }
+ defer func() {
+ for _, g := range req.Groups {
Review Comment:
If `acquireRequest` fails partway through iterating `req.Groups`, earlier
successful acquisitions are leaked because the function returns immediately.
This can block group deletion indefinitely. Release already-acquired groups
before returning an error.
```suggestion
acquiredGroups := make([]string, 0, len(req.Groups))
for _, g := range req.Groups {
if acquireErr := ms.groupRepo.acquireRequest(g); acquireErr !=
nil {
for _, ag := range acquiredGroups {
ms.groupRepo.releaseRequest(ag)
}
return nil, status.Errorf(codes.FailedPrecondition,
"group %s is pending deletion", g)
}
acquiredGroups = append(acquiredGroups, g)
}
defer func() {
for _, g := range acquiredGroups {
```
##########
banyand/property/service.go:
##########
@@ -122,6 +126,12 @@ func (s *service) Validate() error {
return nil
}
+func (s *service) SubscribeGroupDrop(groupName string) <-chan struct{} {
Review Comment:
`SubscribeGroupDrop` stores a new channel unconditionally. If called
multiple times for the same group, earlier subscribers will block forever
because their channel is replaced and never closed. Consider returning an
existing channel if present or closing the previous one before replacing it.
```suggestion
func (s *service) SubscribeGroupDrop(groupName string) <-chan struct{} {
if existing, ok := s.pendingGroupDrops.Load(groupName); ok {
if ch, ok := existing.(chan struct{}); ok {
return ch
}
}
```
##########
pkg/schema/cache.go:
##########
@@ -108,6 +109,13 @@ func (sr *schemaRepo) StopCh() <-chan struct{} {
return sr.closer.CloseNotify()
}
+// SubscribeGroupDrop returns a channel that is closed after the group's
physical storage is dropped.
+func (sr *schemaRepo) SubscribeGroupDrop(groupName string) <-chan struct{} {
+ ch := make(chan struct{})
+ sr.pendingGroupDrops.Store(groupName, ch)
+ return ch
Review Comment:
`SubscribeGroupDrop` overwrites any existing channel for the same group name
without closing/returning the prior one. A previous waiter would then block
forever and the map entry can leak if the group is never deleted. Consider
returning an existing channel if present, or closing/replacing the previous
channel explicitly (and/or adding an unsubscribe mechanism).
##########
banyand/liaison/grpc/deletion.go:
##########
@@ -0,0 +1,408 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+ internalDeletionTaskGroup = "_deletion_task"
+ internalDeletionTaskPropertyName = "deletion_task"
+ taskDataTagName = "task_data"
+)
+
+type propertyApplier interface {
+ Apply(ctx context.Context, req *propertyv1.ApplyRequest)
(*propertyv1.ApplyResponse, error)
+ Query(ctx context.Context, req *propertyv1.QueryRequest)
(*propertyv1.QueryResponse, error)
+}
+
+// GroupDropSubscriber defines an interface for subscribing to group drop
events.
+type GroupDropSubscriber interface {
+ SubscribeGroupDrop(catalog commonv1.Catalog, groupName string) <-chan
struct{}
+}
+
+type groupDeletionTaskManager struct {
+ schemaRegistry metadata.Repo
+ propServer propertyApplier
+ log *logger.Logger
+ groupRepo *groupRepo
+ dropSubscriber GroupDropSubscriber
+ tasks sync.Map
+}
+
+func newGroupDeletionTaskManager(
+ schemaRegistry metadata.Repo, propServer *propertyServer, gr
*groupRepo, dropSubscriber GroupDropSubscriber, l *logger.Logger,
+) *groupDeletionTaskManager {
+ return &groupDeletionTaskManager{
+ schemaRegistry: schemaRegistry,
+ propServer: propServer,
+ groupRepo: gr,
+ dropSubscriber: dropSubscriber,
+ log: l,
+ }
+}
+
+func (m *groupDeletionTaskManager) initPropertyStorage(ctx context.Context)
error {
+ group := &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: internalDeletionTaskGroup,
+ },
+ Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 1,
+ },
+ }
+ _, getGroupErr := m.schemaRegistry.GroupRegistry().GetGroup(ctx,
internalDeletionTaskGroup)
+ if getGroupErr != nil {
+ if createErr :=
m.schemaRegistry.GroupRegistry().CreateGroup(ctx, group); createErr != nil {
+ return fmt.Errorf("failed to create internal deletion
task group: %w", createErr)
+ }
+ }
+ propSchema := &databasev1.Property{
+ Metadata: &commonv1.Metadata{
+ Group: internalDeletionTaskGroup,
+ Name: internalDeletionTaskPropertyName,
+ },
+ Tags: []*databasev1.TagSpec{
+ {
+ Name: taskDataTagName,
+ Type: databasev1.TagType_TAG_TYPE_DATA_BINARY,
+ },
+ },
+ }
+ _, getPropErr := m.schemaRegistry.PropertyRegistry().GetProperty(ctx,
propSchema.Metadata)
+ if getPropErr != nil {
+ if createErr :=
m.schemaRegistry.PropertyRegistry().CreateProperty(ctx, propSchema); createErr
!= nil {
+ return fmt.Errorf("failed to create internal deletion
task property schema: %w", createErr)
+ }
+ }
Review Comment:
`initPropertyStorage` treats any `GetGroup`/`GetProperty` error as "not
found" and proceeds to create the internal resources. If the error is transient
(e.g., permission/network), this can mask the real issue and attempt an invalid
create. Only create on a confirmed NotFound (e.g., `errors.As(err,
schema.ErrGRPCResourceNotFound)` / gRPC `codes.NotFound`), and return the
original error otherwise.
##########
banyand/liaison/grpc/measure.go:
##########
@@ -424,6 +442,16 @@ func (ms *measureService) Query(ctx context.Context, req
*measurev1.QueryRequest
}
func (ms *measureService) TopN(ctx context.Context, topNRequest
*measurev1.TopNRequest) (resp *measurev1.TopNResponse, err error) {
+ for _, g := range topNRequest.GetGroups() {
+ if acquireErr := ms.groupRepo.acquireRequest(g); acquireErr !=
nil {
+ return nil, status.Errorf(codes.FailedPrecondition,
"group %s is pending deletion", g)
+ }
+ }
+ defer func() {
+ for _, g := range topNRequest.GetGroups() {
Review Comment:
In `TopN`, an `acquireRequest` failure returns immediately without releasing
groups already acquired earlier in the loop. This leaks the in-flight reference
and can block deletion. Acquire with rollback/release-on-error.
```suggestion
acquiredGroups := make([]string, 0, len(topNRequest.GetGroups()))
for _, g := range topNRequest.GetGroups() {
if acquireErr := ms.groupRepo.acquireRequest(g); acquireErr !=
nil {
for _, ag := range acquiredGroups {
ms.groupRepo.releaseRequest(ag)
}
return nil, status.Errorf(codes.FailedPrecondition,
"group %s is pending deletion", g)
}
acquiredGroups = append(acquiredGroups, g)
}
defer func() {
for _, g := range acquiredGroups {
```
##########
banyand/liaison/grpc/stream.go:
##########
@@ -320,6 +329,16 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
var emptyStreamQueryResponse = &streamv1.QueryResponse{Elements:
make([]*streamv1.Element, 0)}
func (s *streamService) Query(ctx context.Context, req *streamv1.QueryRequest)
(resp *streamv1.QueryResponse, err error) {
+ for _, g := range req.Groups {
+ if acquireErr := s.groupRepo.acquireRequest(g); acquireErr !=
nil {
+ return nil, status.Errorf(codes.FailedPrecondition,
"group %s is pending deletion", g)
+ }
Review Comment:
If `acquireRequest` fails for a later group in `req.Groups`, any groups
acquired earlier in this loop are never released because the function returns
immediately. This can leak in-flight counters and block pending deletions.
Track which groups were successfully acquired and release them before returning
on error.
```suggestion
func (s *streamService) Query(ctx context.Context, req
*streamv1.QueryRequest) (resp *streamv1.QueryResponse, err error) {
acquiredGroups := make([]string, 0, len(req.Groups))
for _, g := range req.Groups {
if acquireErr := s.groupRepo.acquireRequest(g); acquireErr !=
nil {
for _, ag := range acquiredGroups {
s.groupRepo.releaseRequest(ag)
}
return nil, status.Errorf(codes.FailedPrecondition,
"group %s is pending deletion", g)
}
acquiredGroups = append(acquiredGroups, g)
```
##########
banyand/liaison/grpc/property.go:
##########
@@ -415,6 +424,16 @@ func (ps *propertyServer) Delete(ctx context.Context, req
*propertyv1.DeleteRequ
}
func (ps *propertyServer) Query(ctx context.Context, req
*propertyv1.QueryRequest) (resp *propertyv1.QueryResponse, err error) {
+ for _, g := range req.Groups {
+ if acquireErr := ps.groupRepo.acquireRequest(g); acquireErr !=
nil {
Review Comment:
If `acquireRequest` fails for one of the requested groups, earlier acquired
groups are not released due to the immediate return. This leaks the in-flight
count and can cause group deletion to hang. Track acquired groups and release
them before returning on error.
```suggestion
for i, g := range req.Groups {
if acquireErr := ps.groupRepo.acquireRequest(g); acquireErr !=
nil {
// Release all groups that were successfully acquired
before this failure.
for j := 0; j < i; j++ {
ps.groupRepo.releaseRequest(req.Groups[j])
}
```
##########
api/proto/banyandb/database/v1/rpc.proto:
##########
@@ -358,21 +358,13 @@ message GroupRegistryServiceUpdateResponse {}
message GroupRegistryServiceDeleteRequest {
// group is the name of the group to delete.
string group = 1;
- // dry_run indicates whether to perform a dry run without actually deleting
data.
- // When true, returns what would be deleted without making changes.
- bool dry_run = 2;
// force indicates whether to force delete the group even if it contains
data.
// When false, deletion will fail if the group is not empty.
- bool force = 3;
+ bool force = 2;
}
Review Comment:
In protobuf, field numbers must never be reused. Reassigning `force` from
tag 3 to tag 2 (previously `dry_run`) will cause old clients to have
`dry_run=true` interpreted as `force=true` and breaks wire compatibility. Keep
`force = 3` and mark tag 2 as `reserved` (and keep `dry_run` removed) instead.
##########
banyand/liaison/grpc/deletion.go:
##########
@@ -0,0 +1,408 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+ internalDeletionTaskGroup = "_deletion_task"
+ internalDeletionTaskPropertyName = "deletion_task"
+ taskDataTagName = "task_data"
+)
+
+type propertyApplier interface {
+ Apply(ctx context.Context, req *propertyv1.ApplyRequest)
(*propertyv1.ApplyResponse, error)
+ Query(ctx context.Context, req *propertyv1.QueryRequest)
(*propertyv1.QueryResponse, error)
+}
+
+// GroupDropSubscriber defines an interface for subscribing to group drop
events.
+type GroupDropSubscriber interface {
+ SubscribeGroupDrop(catalog commonv1.Catalog, groupName string) <-chan
struct{}
+}
+
+type groupDeletionTaskManager struct {
+ schemaRegistry metadata.Repo
+ propServer propertyApplier
+ log *logger.Logger
+ groupRepo *groupRepo
+ dropSubscriber GroupDropSubscriber
+ tasks sync.Map
+}
+
+func newGroupDeletionTaskManager(
+ schemaRegistry metadata.Repo, propServer *propertyServer, gr
*groupRepo, dropSubscriber GroupDropSubscriber, l *logger.Logger,
+) *groupDeletionTaskManager {
+ return &groupDeletionTaskManager{
+ schemaRegistry: schemaRegistry,
+ propServer: propServer,
+ groupRepo: gr,
+ dropSubscriber: dropSubscriber,
+ log: l,
+ }
+}
+
+func (m *groupDeletionTaskManager) initPropertyStorage(ctx context.Context)
error {
+ group := &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: internalDeletionTaskGroup,
+ },
+ Catalog: commonv1.Catalog_CATALOG_PROPERTY,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 1,
+ },
+ }
+ _, getGroupErr := m.schemaRegistry.GroupRegistry().GetGroup(ctx,
internalDeletionTaskGroup)
+ if getGroupErr != nil {
+ if createErr :=
m.schemaRegistry.GroupRegistry().CreateGroup(ctx, group); createErr != nil {
+ return fmt.Errorf("failed to create internal deletion
task group: %w", createErr)
+ }
+ }
+ propSchema := &databasev1.Property{
+ Metadata: &commonv1.Metadata{
+ Group: internalDeletionTaskGroup,
+ Name: internalDeletionTaskPropertyName,
+ },
+ Tags: []*databasev1.TagSpec{
+ {
+ Name: taskDataTagName,
+ Type: databasev1.TagType_TAG_TYPE_DATA_BINARY,
+ },
+ },
+ }
+ _, getPropErr := m.schemaRegistry.PropertyRegistry().GetProperty(ctx,
propSchema.Metadata)
+ if getPropErr != nil {
+ if createErr :=
m.schemaRegistry.PropertyRegistry().CreateProperty(ctx, propSchema); createErr
!= nil {
+ return fmt.Errorf("failed to create internal deletion
task property schema: %w", createErr)
+ }
+ }
+ return nil
+}
+
+func (m *groupDeletionTaskManager) startDeletion(ctx context.Context, group
string) error {
+ if _, loaded := m.tasks.LoadOrStore(group, true); loaded {
+ return fmt.Errorf("deletion task for group %s is already in
progress", group)
+ }
+ task := &databasev1.GroupDeletionTask{
+ CurrentPhase: databasev1.GroupDeletionTask_PHASE_PENDING,
+ TotalCounts: make(map[string]int32),
+ DeletedCounts: make(map[string]int32),
+ CreatedAt: timestamppb.Now(),
+ }
+ dataInfo, dataErr := m.schemaRegistry.CollectDataInfo(ctx, group)
+ if dataErr != nil {
+ m.tasks.Delete(group)
+ return fmt.Errorf("failed to collect data info for group %s:
%w", group, dataErr)
+ }
+ var totalDataSize int64
+ for _, di := range dataInfo {
+ totalDataSize += di.GetDataSizeBytes()
+ }
+ task.TotalDataSizeBytes = totalDataSize
+ if saveErr := m.saveDeletionTask(ctx, group, task); saveErr != nil {
+ m.tasks.Delete(group)
+ return fmt.Errorf("failed to save initial deletion task for
group %s: %w", group, saveErr)
+ }
+ go m.executeDeletion(context.WithoutCancel(ctx), group, task)
+ return nil
+}
+
+func (m *groupDeletionTaskManager) executeDeletion(ctx context.Context, group
string, task *databasev1.GroupDeletionTask) {
+ defer m.tasks.Delete(group)
+ opt := schema.ListOpt{Group: group}
+
+ task.Message = "waiting for in-flight requests to complete"
+ m.saveProgress(ctx, group, task)
+ done := m.groupRepo.startPendingDeletion(group)
+ defer m.groupRepo.clearPendingDeletion(group)
+ <-done
+
+ task.CurrentPhase = databasev1.GroupDeletionTask_PHASE_IN_PROGRESS
+
+ type deletionStep struct {
+ fn func() error
+ message string
+ }
+ steps := []deletionStep{
+ {func() error { return m.deleteIndexRuleBindings(ctx, opt,
task) }, "deleting index rule bindings"},
+ {func() error { return m.deleteIndexRules(ctx, opt, task) },
"deleting index rules"},
+ {func() error { return m.deleteProperties(ctx, opt, task) },
"deleting properties"},
+ {func() error { return m.deleteStreams(ctx, opt, task) },
"deleting streams"},
+ {func() error { return m.deleteMeasures(ctx, opt, task) },
"deleting measures"},
+ {func() error { return m.deleteTraces(ctx, opt, task) },
"deleting traces"},
+ {func() error { return m.deleteTopNAggregations(ctx, opt, task)
}, "deleting topN aggregations"},
+ }
+ for _, step := range steps {
+ if stepErr := m.runStep(ctx, group, task, step.message,
step.fn); stepErr != nil {
+ return
+ }
+ }
+
+ task.Message = "deleting group and data files"
+ var dropCh <-chan struct{}
+ if m.dropSubscriber != nil {
+ if groupMeta, getErr :=
m.schemaRegistry.GroupRegistry().GetGroup(ctx, group); getErr == nil {
+ dropCh =
m.dropSubscriber.SubscribeGroupDrop(groupMeta.Catalog, group)
+ }
+ }
+ _, deleteGroupErr := m.schemaRegistry.GroupRegistry().DeleteGroup(ctx,
group)
+ if deleteGroupErr != nil {
+ m.failTask(ctx, group, task, fmt.Sprintf("failed to delete
group: %v", deleteGroupErr))
+ if dropCh != nil {
+ go func() { <-dropCh }()
Review Comment:
On `DeleteGroup` error, spawning a goroutine to wait on `dropCh` can leak
forever because the drop channel is only closed on a successful group-delete
event. It also leaves the subscriber's pending-drop entry around. Prefer
unsubscribing/closing the drop subscription on error (or only subscribing after
`DeleteGroup` succeeds), or wait with a bounded timeout/context and clean up
the subscription.
```suggestion
select {
case <-dropCh:
default:
}
```
##########
banyand/property/db/db.go:
##########
@@ -374,6 +376,27 @@ func (db *database) getShard(group string, id
common.ShardID) (*shard, bool) {
return nil, false
}
+// Drop closes and removes all shards for the given group and deletes the
group directory.
+func (db *database) Drop(groupName string) error {
+ value, ok := db.groups.LoadAndDelete(groupName)
+ if !ok {
+ return nil
+ }
+ gs := value.(*groupShards)
+ sLst := gs.shards.Load()
+ if sLst != nil {
+ var err error
+ for _, s := range *sLst {
+ multierr.AppendInto(&err, s.close())
+ }
+ if err != nil {
+ return err
+ }
+ }
+ db.lfs.MustRMAll(gs.location)
+ return nil
Review Comment:
`Drop` uses `MustRMAll`, which can panic and crash the process if the
filesystem removal fails. Other new Drop implementations recover and return an
error instead. Prefer a non-panicking removal (or wrap `MustRMAll` with
`defer`/`recover`) so `Drop` reliably returns an error rather than taking down
the node.
##########
banyand/liaison/grpc/trace.go:
##########
@@ -409,6 +418,16 @@ func (s *traceService) Write(stream
tracev1.TraceService_WriteServer) error {
var emptyTraceQueryResponse = &tracev1.QueryResponse{Traces:
make([]*tracev1.Trace, 0)}
func (s *traceService) Query(ctx context.Context, req *tracev1.QueryRequest)
(resp *tracev1.QueryResponse, err error) {
+ for _, g := range req.Groups {
+ if acquireErr := s.groupRepo.acquireRequest(g); acquireErr !=
nil {
Review Comment:
Same issue as in stream Query: if `acquireRequest` fails for a later group,
previously acquired groups are not released due to the early return. This can
prevent group deletion from ever completing. Consider acquiring with rollback
(release already-acquired groups) on failure.
```suggestion
for i, g := range req.Groups {
if acquireErr := s.groupRepo.acquireRequest(g); acquireErr !=
nil {
// Roll back previously acquired groups before
returning the error.
for j := 0; j < i; j++ {
s.groupRepo.releaseRequest(req.Groups[j])
}
```
##########
pkg/schema/cache.go:
##########
@@ -197,6 +205,9 @@ func (sr *schemaRepo) Watcher() {
switch evt.Kind {
case EventKindGroup:
err =
sr.deleteGroup(evt.Metadata.GetMetadata())
+ if dropCh, loaded :=
sr.pendingGroupDrops.LoadAndDelete(evt.Metadata.GetMetadata().GetName());
loaded {
+
close(dropCh.(chan struct{}))
+ }
Review Comment:
The drop notification channel is closed regardless of whether `deleteGroup`
succeeded. If `g.drop()` fails, listeners will be notified even though storage
may still exist, and the deletion task can incorrectly mark completion. Only
close the drop channel after `deleteGroup` returns nil; if it fails, keep the
subscription and let callers observe/handle the failure (or propagate an error).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]