Copilot commented on code in PR #1366: URL: https://github.com/apache/dubbo-admin/pull/1366#discussion_r2604961724
########## pkg/core/lock/component.go: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "context" + "time" + + "github.com/pkg/errors" + + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +const ( + // DistributedLockComponent is the component type for distributed lock + DistributedLockComponent runtime.ComponentType = "distributed lock" +) + +// Component implements the runtime.Component interface for distributed lock +type Component struct { + lock Lock +} + +// NewComponent creates a new distributed lock component +func NewComponent() *Component { + return &Component{} +} + +// Type returns the component type +func (c *Component) Type() runtime.ComponentType { + return DistributedLockComponent +} + +// Order indicates the initialization order +// Lock should be initialized after Store (Order 100) but before other services +func (c *Component) Order() int { + return 90 // After Store, before Console +} + +// Init initializes the distributed lock component +func (c *Component) Init(ctx runtime.BuilderContext) error { + // Get the store component to access connection pool + storeComp, err := ctx.GetActivatedComponent(runtime.ResourceStore) + if err != nil { + return err + } + + // Try to extract connection pool from store component + // We need to use type assertion with the proper interface + type ConnectionPoolProvider interface { + GetConnectionPool() *dbcommon.ConnectionPool + } + + storeWithPool, ok := storeComp.(ConnectionPoolProvider) + if !ok { + // For memory store or other stores without connection pool + logger.Warnf("Store component does not provide connection pool, distributed lock will not be available") + return nil + } + + pool := storeWithPool.GetConnectionPool() + if pool == nil { + logger.Warnf("Connection pool is nil, distributed lock will not be available") + return nil + } + + // Create GORM-based lock implementation using NewGormLock + c.lock = NewGormLock(pool) + + // Initialize the lock table + db := pool.GetDB() + if err := db.AutoMigrate(&LockRecord{}); err != nil { + return errors.Wrap(err, "failed to migrate lock table") + } + + logger.Info("Distributed lock component initialized successfully") + return nil +} + +// Start starts the distributed lock component +func (c *Component) Start(rt runtime.Runtime, stop <-chan struct{}) error { + if c.lock == nil { + logger.Warn("Distributed lock not available, skipping") + return nil + } + + // Start background cleanup task + ticker := time.NewTicker(5 * time.Minute) // Cleanup every 5 minutes + defer ticker.Stop() + + logger.Info("Distributed lock cleanup task started") + + for { + select { + case <-stop: + logger.Info("Distributed lock cleanup task stopped") + return nil + case <-ticker.C: + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + if err := c.lock.CleanupExpiredLocks(ctx); err != nil { + logger.Errorf("Failed to cleanup expired locks: %v", err) Review Comment: Extra space before %v - there are two spaces instead of one. ```suggestion logger.Errorf("Failed to cleanup expired locks: %v", err) ``` ########## pkg/core/lock/component.go: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "context" + "time" + + "github.com/pkg/errors" + + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +const ( + // DistributedLockComponent is the component type for distributed lock + DistributedLockComponent runtime.ComponentType = "distributed lock" +) + +// Component implements the runtime.Component interface for distributed lock +type Component struct { + lock Lock +} + +// NewComponent creates a new distributed lock component +func NewComponent() *Component { + return &Component{} +} + +// Type returns the component type +func (c *Component) Type() runtime.ComponentType { + return DistributedLockComponent +} + +// Order indicates the initialization order +// Lock should be initialized after Store (Order 100) but before other services +func (c *Component) Order() int { + return 90 // After Store, before Console +} + +// Init initializes the distributed lock component +func (c *Component) Init(ctx runtime.BuilderContext) error { + // Get the store component to access connection pool + storeComp, err := ctx.GetActivatedComponent(runtime.ResourceStore) + if err != nil { + return err + } + + // Try to extract connection pool from store component + // We need to use type assertion with the proper interface + type ConnectionPoolProvider interface { + GetConnectionPool() *dbcommon.ConnectionPool + } + + storeWithPool, ok := storeComp.(ConnectionPoolProvider) + if !ok { + // For memory store or other stores without connection pool + logger.Warnf("Store component does not provide connection pool, distributed lock will not be available") + return nil + } Review Comment: The `ConnectionPoolProvider` interface is defined and used to extract the connection pool from the store component, but there is no actual implementation of this interface in the store component. The store component (`pkg/core/store/component.go`) does not expose a `GetConnectionPool()` method, which means the distributed lock initialization will always fail silently with a warning log instead of working properly. You need to add a `GetConnectionPool()` method to the store component that returns the connection pool from one of the underlying stores. ########## pkg/console/service/condition_rule.go: ########## @@ -73,14 +76,46 @@ func GetConditionRule(ctx context.Context, name string, mesh string) (*meshresou } func UpdateConditionRule(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error { + lock := ctx.LockManager() + if lock == nil { + // Lock not available, proceed without lock protection + return updateConditionRuleUnsafe(ctx, name, res) + } + + // Use distributed lock to prevent concurrent modifications + lockKey := fmt.Sprintf("condition_route:%s:%s", res.Mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return updateConditionRuleUnsafe(ctx, name, res) + }) +} + +func updateConditionRuleUnsafe(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error { if err := ctx.ResourceManager().Update(res); err != nil { logger.Warnf("update %s condition failed with error: %s", name, err.Error()) return err } + logger.Infof("Condition route %s updated successfully", name) return nil } func CreateConditionRule(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error { + lock := ctx.LockManager() + if lock == nil { + return createConditionRuleUnsafe(ctx, name, res) + } + + lockKey := fmt.Sprintf("condition_route:%s:%s", res.Mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return createConditionRuleUnsafe(ctx, name, res) + }) +} + +// createConditionRuleUnsafe performs the actual creation without lock protection Review Comment: The comment "createConditionRuleUnsafe performs the actual creation without lock protection" would be more consistent if it also appeared for `updateConditionRuleUnsafe` at line 94 (which has no comment) and for the similar "Unsafe" functions in other files. Consider adding comments to all "*Unsafe" functions for consistency. ########## pkg/core/lock/gorm_lock.go: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "gorm.io/gorm" + "gorm.io/gorm/clause" + + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +// Ensure GormLock implements Lock interface +var _ Lock = (*GormLock)(nil) + +// GormLock provides distributed locking using database as backend +// It uses GORM for database operations and supports MySQL, PostgreSQL, etc. +type GormLock struct { + pool *dbcommon.ConnectionPool + owner string // Unique identifier for this lock instance +} + +// NewGormLock creates a new GORM-based distributed lock instance +func NewGormLock(pool *dbcommon.ConnectionPool) Lock { + return &GormLock{ + pool: pool, + owner: uuid.New().String(), + } +} + +// Lock acquires a lock with the specified key and TTL +// It blocks until the lock is acquired or context is cancelled +func (g *GormLock) Lock(ctx context.Context, key string, ttl time.Duration) error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + acquired, err := g.TryLock(ctx, key, ttl) + if err != nil { + return fmt.Errorf("failed to try lock: %w", err) + } + if acquired { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +// TryLock attempts to acquire a lock without blocking +// Returns true if lock was acquired, false otherwise +func (g *GormLock) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error) { + db := g.pool.GetDB().WithContext(ctx) + expireAt := time.Now().Add(ttl) + + var acquired bool + err := db.Transaction(func(tx *gorm.DB) error { + // Clean up only this key's expired lock to improve performance + now := time.Now() + if err := tx.Where("lock_key = ? AND expire_at < ?", key, now). + Delete(&LockRecord{}).Error; err != nil { + return fmt.Errorf("failed to clean expired lock for key %s: %w", key, err) + } + + // Try to acquire lock using INSERT ... ON CONFLICT + lock := &LockRecord{ + LockKey: key, + Owner: g.owner, + ExpireAt: expireAt, + } + + // Try to insert the lock record + result := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "lock_key"}}, + DoNothing: true, // If conflict, do nothing + }).Create(lock) + + if result.Error != nil { + return fmt.Errorf("failed to insert lock record: %w", result.Error) + } + + // Check if we got the lock by verifying the owner + var existingLock LockRecord + if err := tx.Where("lock_key = ? ", key).First(&existingLock).Error; err != nil { + return fmt.Errorf("failed to verify lock ownership: %w", err) + } + + // Determine if we acquired the lock + acquired = existingLock.Owner == g.owner + return nil + }) + + if err != nil { + return false, err + } + + if acquired { + logger.Debugf("Lock acquired: key=%s, owner=%s, ttl=%v", key, g.owner, ttl) + } + + return acquired, nil +} + +// Unlock releases a lock held by this instance +func (g *GormLock) Unlock(ctx context.Context, key string) error { + db := g.pool.GetDB().WithContext(ctx) + + result := db.Where("lock_key = ? AND owner = ?", key, g.owner). + Delete(&LockRecord{}) + + if result.Error != nil { + return fmt.Errorf("failed to release lock: %w", result.Error) + } + + if result.RowsAffected == 0 { + return ErrLockNotHeld + } + + logger.Debugf("Lock released: key=%s, owner=%s", key, g.owner) + return nil +} + +// Renew extends the TTL of a lock held by this instance +func (g *GormLock) Renew(ctx context.Context, key string, ttl time.Duration) error { + db := g.pool.GetDB().WithContext(ctx) + newExpireAt := time.Now().Add(ttl) + + result := db.Model(&LockRecord{}). + Where("lock_key = ? AND owner = ?", key, g.owner). + Update("expire_at", newExpireAt) + + if result.Error != nil { + return fmt.Errorf("failed to renew lock: %w", result.Error) + } + + if result.RowsAffected == 0 { + return ErrLockNotHeld + } + + logger.Debugf("Lock renewed: key=%s, owner=%s, new_expire_at=%v", key, g.owner, newExpireAt) + return nil +} + +// IsLocked checks if a lock is currently held (by anyone) +func (g *GormLock) IsLocked(ctx context.Context, key string) (bool, error) { + db := g.pool.GetDB().WithContext(ctx) + + var count int64 + err := db.Model(&LockRecord{}). + Where("lock_key = ? AND expire_at > ?", key, time.Now()). + Count(&count).Error + + if err != nil { + return false, fmt.Errorf("failed to check lock status: %w", err) + } + + return count > 0, nil +} + +// WithLock executes a function while holding a lock +// It automatically acquires the lock, executes the function, and releases the lock +// If TTL is longer than 10 seconds, it will automatically renew the lock until the function completes +func (g *GormLock) WithLock(ctx context.Context, key string, ttl time.Duration, fn func() error) error { + // Acquire lock + if err := g.Lock(ctx, key, ttl); err != nil { + return fmt.Errorf("failed to acquire lock: %w", err) + } + + // Ensure lock is released + defer func() { + // Use background context for unlock to ensure it completes even if ctx is cancelled + unlockCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := g.Unlock(unlockCtx, key); err != nil { + logger.Errorf("Failed to release lock %s: %v", key, err) + } + }() + + // Start auto-renewal if TTL is long enough + var renewDone chan struct{} + if ttl > 10*time.Second { + renewDone = make(chan struct{}) + go g.autoRenew(ctx, key, ttl, renewDone) + defer close(renewDone) Review Comment: The auto-renewal goroutine is started if TTL > 10 seconds, but there's no guarantee it will complete before the `defer close(renewDone)` is executed at line 209. If the function `fn()` returns very quickly (before the first renewal), the done channel will be closed, but the goroutine will still try to renew once (at line 234) before checking the done channel. This is not necessarily a bug, but the goroutine could perform one unnecessary renewal attempt. Consider checking the done channel before starting the ticker or document this behavior. ########## pkg/core/bootstrap/bootstrap.go: ########## @@ -147,3 +153,10 @@ func initAndActivateComponent(builder *runtime.Builder, comp runtime.Component) } return nil } +func initDistributedLock(builder *runtime.Builder) error { + comp, err := runtime.ComponentRegistry().Get(lock.DistributedLockComponent) + if err != nil { + return err + } + return initAndActivateComponent(builder, comp) +} Review Comment: The `initDistributedLock` function doesn't follow the same pattern as other initialization functions which typically call both `initAndActivateComponent`. Additionally, the function lacks a blank line separator from the closing brace above, which is inconsistent with the rest of the file's formatting style. ########## pkg/core/lock/gorm_lock.go: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "gorm.io/gorm" + "gorm.io/gorm/clause" + + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +// Ensure GormLock implements Lock interface +var _ Lock = (*GormLock)(nil) + +// GormLock provides distributed locking using database as backend +// It uses GORM for database operations and supports MySQL, PostgreSQL, etc. +type GormLock struct { + pool *dbcommon.ConnectionPool + owner string // Unique identifier for this lock instance +} + +// NewGormLock creates a new GORM-based distributed lock instance +func NewGormLock(pool *dbcommon.ConnectionPool) Lock { + return &GormLock{ + pool: pool, + owner: uuid.New().String(), + } +} + +// Lock acquires a lock with the specified key and TTL +// It blocks until the lock is acquired or context is cancelled +func (g *GormLock) Lock(ctx context.Context, key string, ttl time.Duration) error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + acquired, err := g.TryLock(ctx, key, ttl) + if err != nil { + return fmt.Errorf("failed to try lock: %w", err) + } + if acquired { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +// TryLock attempts to acquire a lock without blocking +// Returns true if lock was acquired, false otherwise +func (g *GormLock) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error) { + db := g.pool.GetDB().WithContext(ctx) + expireAt := time.Now().Add(ttl) + + var acquired bool + err := db.Transaction(func(tx *gorm.DB) error { + // Clean up only this key's expired lock to improve performance + now := time.Now() + if err := tx.Where("lock_key = ? AND expire_at < ?", key, now). + Delete(&LockRecord{}).Error; err != nil { + return fmt.Errorf("failed to clean expired lock for key %s: %w", key, err) + } + + // Try to acquire lock using INSERT ... ON CONFLICT + lock := &LockRecord{ + LockKey: key, + Owner: g.owner, + ExpireAt: expireAt, + } + + // Try to insert the lock record + result := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "lock_key"}}, + DoNothing: true, // If conflict, do nothing + }).Create(lock) + + if result.Error != nil { + return fmt.Errorf("failed to insert lock record: %w", result.Error) Review Comment: Extra space before %w - there are two spaces instead of one. ```suggestion return fmt.Errorf("failed to insert lock record: %w", result.Error) ``` ########## pkg/core/lock/gorm_lock.go: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "gorm.io/gorm" + "gorm.io/gorm/clause" + + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +// Ensure GormLock implements Lock interface +var _ Lock = (*GormLock)(nil) + +// GormLock provides distributed locking using database as backend +// It uses GORM for database operations and supports MySQL, PostgreSQL, etc. +type GormLock struct { + pool *dbcommon.ConnectionPool + owner string // Unique identifier for this lock instance +} + +// NewGormLock creates a new GORM-based distributed lock instance +func NewGormLock(pool *dbcommon.ConnectionPool) Lock { + return &GormLock{ + pool: pool, + owner: uuid.New().String(), + } +} + +// Lock acquires a lock with the specified key and TTL +// It blocks until the lock is acquired or context is cancelled +func (g *GormLock) Lock(ctx context.Context, key string, ttl time.Duration) error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + acquired, err := g.TryLock(ctx, key, ttl) + if err != nil { + return fmt.Errorf("failed to try lock: %w", err) + } + if acquired { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +// TryLock attempts to acquire a lock without blocking +// Returns true if lock was acquired, false otherwise +func (g *GormLock) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error) { + db := g.pool.GetDB().WithContext(ctx) + expireAt := time.Now().Add(ttl) + + var acquired bool + err := db.Transaction(func(tx *gorm.DB) error { + // Clean up only this key's expired lock to improve performance + now := time.Now() + if err := tx.Where("lock_key = ? AND expire_at < ?", key, now). + Delete(&LockRecord{}).Error; err != nil { + return fmt.Errorf("failed to clean expired lock for key %s: %w", key, err) + } + + // Try to acquire lock using INSERT ... ON CONFLICT + lock := &LockRecord{ + LockKey: key, + Owner: g.owner, + ExpireAt: expireAt, + } + + // Try to insert the lock record + result := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "lock_key"}}, + DoNothing: true, // If conflict, do nothing + }).Create(lock) + + if result.Error != nil { + return fmt.Errorf("failed to insert lock record: %w", result.Error) + } + + // Check if we got the lock by verifying the owner + var existingLock LockRecord + if err := tx.Where("lock_key = ? ", key).First(&existingLock).Error; err != nil { + return fmt.Errorf("failed to verify lock ownership: %w", err) + } + + // Determine if we acquired the lock + acquired = existingLock.Owner == g.owner + return nil + }) + + if err != nil { + return false, err + } + + if acquired { + logger.Debugf("Lock acquired: key=%s, owner=%s, ttl=%v", key, g.owner, ttl) + } + + return acquired, nil +} + +// Unlock releases a lock held by this instance +func (g *GormLock) Unlock(ctx context.Context, key string) error { + db := g.pool.GetDB().WithContext(ctx) + + result := db.Where("lock_key = ? AND owner = ?", key, g.owner). + Delete(&LockRecord{}) + + if result.Error != nil { + return fmt.Errorf("failed to release lock: %w", result.Error) + } + + if result.RowsAffected == 0 { + return ErrLockNotHeld + } + + logger.Debugf("Lock released: key=%s, owner=%s", key, g.owner) + return nil +} + +// Renew extends the TTL of a lock held by this instance +func (g *GormLock) Renew(ctx context.Context, key string, ttl time.Duration) error { + db := g.pool.GetDB().WithContext(ctx) + newExpireAt := time.Now().Add(ttl) + + result := db.Model(&LockRecord{}). + Where("lock_key = ? AND owner = ?", key, g.owner). + Update("expire_at", newExpireAt) + + if result.Error != nil { + return fmt.Errorf("failed to renew lock: %w", result.Error) + } + + if result.RowsAffected == 0 { + return ErrLockNotHeld + } + + logger.Debugf("Lock renewed: key=%s, owner=%s, new_expire_at=%v", key, g.owner, newExpireAt) + return nil +} + +// IsLocked checks if a lock is currently held (by anyone) +func (g *GormLock) IsLocked(ctx context.Context, key string) (bool, error) { + db := g.pool.GetDB().WithContext(ctx) + + var count int64 + err := db.Model(&LockRecord{}). + Where("lock_key = ? AND expire_at > ?", key, time.Now()). + Count(&count).Error + + if err != nil { + return false, fmt.Errorf("failed to check lock status: %w", err) + } + + return count > 0, nil +} + +// WithLock executes a function while holding a lock +// It automatically acquires the lock, executes the function, and releases the lock +// If TTL is longer than 10 seconds, it will automatically renew the lock until the function completes +func (g *GormLock) WithLock(ctx context.Context, key string, ttl time.Duration, fn func() error) error { + // Acquire lock + if err := g.Lock(ctx, key, ttl); err != nil { + return fmt.Errorf("failed to acquire lock: %w", err) + } + + // Ensure lock is released + defer func() { + // Use background context for unlock to ensure it completes even if ctx is cancelled + unlockCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := g.Unlock(unlockCtx, key); err != nil { + logger.Errorf("Failed to release lock %s: %v", key, err) + } + }() + + // Start auto-renewal if TTL is long enough + var renewDone chan struct{} + if ttl > 10*time.Second { + renewDone = make(chan struct{}) + go g.autoRenew(ctx, key, ttl, renewDone) + defer close(renewDone) + } + + // Execute the function + return fn() +} + +// autoRenew periodically renews the lock until done channel is closed +func (g *GormLock) autoRenew(ctx context.Context, key string, ttl time.Duration, done <-chan struct{}) { + // Renew at 1/3 of TTL to ensure lock doesn't expire + renewInterval := ttl / 3 + ticker := time.NewTicker(renewInterval) + defer ticker.Stop() + + logger.Debugf("Auto-renewal started for lock %s (interval: %v)", key, renewInterval) + + for { + select { + case <-done: + logger.Debugf("Auto-renewal stopped for lock %s (done signal)", key) + return + case <-ctx.Done(): + logger.Debugf("Auto-renewal stopped for lock %s (context cancelled)", key) + return + case <-ticker.C: + renewCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := g.Renew(renewCtx, key, ttl); err != nil { + logger.Warnf("Failed to renew lock %s: %v", key, err) + cancel() + return + } + cancel() + logger.Debugf("Lock %s renewed successfully", key) + } + } +} + +// CleanupExpiredLocks removes all expired locks from the database +// This should be called periodically as a maintenance task +func (g *GormLock) CleanupExpiredLocks(ctx context.Context) error { + db := g.pool.GetDB().WithContext(ctx) + + result := db.Where("expire_at < ?", time.Now()).Delete(&LockRecord{}) + if result.Error != nil { + return fmt.Errorf("failed to cleanup expired locks: %w", result.Error) + } + + if result.RowsAffected > 0 { + logger.Infof("Cleaned up %d expired locks", result.RowsAffected) + } + + return nil +} Review Comment: The distributed lock implementation lacks any test coverage. Given that this is a critical component for coordinating concurrent operations in a distributed system, comprehensive tests are essential. The tests should cover: basic lock/unlock operations, concurrent lock attempts, lock expiration, lock renewal, TTL handling, database transaction failures, and the ON CONFLICT behavior. The codebase demonstrates comprehensive testing patterns in pkg/store/dbcommon/ that should be followed for this new feature. ########## pkg/core/lock/gorm_lock.go: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "gorm.io/gorm" + "gorm.io/gorm/clause" + + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +// Ensure GormLock implements Lock interface +var _ Lock = (*GormLock)(nil) + +// GormLock provides distributed locking using database as backend +// It uses GORM for database operations and supports MySQL, PostgreSQL, etc. +type GormLock struct { + pool *dbcommon.ConnectionPool + owner string // Unique identifier for this lock instance +} + +// NewGormLock creates a new GORM-based distributed lock instance +func NewGormLock(pool *dbcommon.ConnectionPool) Lock { + return &GormLock{ + pool: pool, + owner: uuid.New().String(), + } +} + +// Lock acquires a lock with the specified key and TTL +// It blocks until the lock is acquired or context is cancelled +func (g *GormLock) Lock(ctx context.Context, key string, ttl time.Duration) error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + acquired, err := g.TryLock(ctx, key, ttl) + if err != nil { + return fmt.Errorf("failed to try lock: %w", err) + } + if acquired { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +// TryLock attempts to acquire a lock without blocking +// Returns true if lock was acquired, false otherwise +func (g *GormLock) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error) { + db := g.pool.GetDB().WithContext(ctx) + expireAt := time.Now().Add(ttl) + + var acquired bool + err := db.Transaction(func(tx *gorm.DB) error { + // Clean up only this key's expired lock to improve performance + now := time.Now() + if err := tx.Where("lock_key = ? AND expire_at < ?", key, now). + Delete(&LockRecord{}).Error; err != nil { + return fmt.Errorf("failed to clean expired lock for key %s: %w", key, err) + } + + // Try to acquire lock using INSERT ... ON CONFLICT + lock := &LockRecord{ + LockKey: key, + Owner: g.owner, + ExpireAt: expireAt, + } + + // Try to insert the lock record + result := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "lock_key"}}, + DoNothing: true, // If conflict, do nothing + }).Create(lock) Review Comment: The comment says "Try to acquire lock using INSERT ... ON CONFLICT" but the implementation only works correctly for databases that support the ON CONFLICT clause (PostgreSQL, SQLite). MySQL uses different syntax (INSERT ... ON DUPLICATE KEY UPDATE or INSERT IGNORE) and may not work as expected with this GORM clause. This should be documented or handled differently for cross-database compatibility. ```suggestion // Try to acquire lock using dialect-specific upsert/ignore semantics. // For PostgreSQL/SQLite: INSERT ... ON CONFLICT DO NOTHING // For MySQL: INSERT IGNORE or INSERT ... ON DUPLICATE KEY UPDATE lock := &LockRecord{ LockKey: key, Owner: g.owner, ExpireAt: expireAt, } var result *gorm.DB switch tx.Dialector.Name() { case "mysql": // Use INSERT IGNORE for MySQL result = tx.Clauses(clause.Insert{Modifier: "IGNORE"}).Create(lock) case "sqlite", "postgres": // Use ON CONFLICT DO NOTHING for SQLite/Postgres result = tx.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "lock_key"}}, DoNothing: true, // If conflict, do nothing }).Create(lock) default: return fmt.Errorf("unsupported database dialect: %s", tx.Dialector.Name()) } ``` ########## pkg/console/service/tag_rule.go: ########## @@ -38,6 +41,20 @@ func GetTagRule(ctx consolectx.Context, name string, mesh string) (*meshresource } func UpdateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) error { + lock := ctx.LockManager() + if lock == nil { + return updateTagRuleUnsafe(ctx, res) + } + + lockKey := fmt.Sprintf("tag_route:%s:%s", res.Mesh, res.Name) + lockTimeout := 30 * time.Second Review Comment: The lock timeout of 30 seconds is hardcoded here and in multiple other places (configurator_rule.go, condition_rule.go). Consider defining this as a constant (e.g., `const DefaultLockTimeout = 30 * time.Second`) to maintain consistency and make it easier to adjust the timeout value across all lock operations. ########## pkg/core/lock/component.go: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "context" + "time" + + "github.com/pkg/errors" + + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +const ( + // DistributedLockComponent is the component type for distributed lock + DistributedLockComponent runtime.ComponentType = "distributed lock" +) + +// Component implements the runtime.Component interface for distributed lock +type Component struct { + lock Lock +} + +// NewComponent creates a new distributed lock component +func NewComponent() *Component { + return &Component{} +} + +// Type returns the component type +func (c *Component) Type() runtime.ComponentType { + return DistributedLockComponent +} + +// Order indicates the initialization order +// Lock should be initialized after Store (Order 100) but before other services +func (c *Component) Order() int { + return 90 // After Store, before Console Review Comment: The component order is set to 90, which is meant to be "After Store, before Console". However, the Store component has order `math.MaxInt - 1` (see pkg/core/store/component.go:63), which is a very high number (higher values start first according to runtime/component.go:44). This means the lock component will actually start BEFORE the Store component, not after. The order value should be higher than the Store's order (e.g., `math.MaxInt - 2` or similar) to ensure proper initialization sequence. ########## pkg/core/lock/gorm_lock.go: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "gorm.io/gorm" + "gorm.io/gorm/clause" + + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +// Ensure GormLock implements Lock interface +var _ Lock = (*GormLock)(nil) + +// GormLock provides distributed locking using database as backend +// It uses GORM for database operations and supports MySQL, PostgreSQL, etc. +type GormLock struct { + pool *dbcommon.ConnectionPool + owner string // Unique identifier for this lock instance +} + +// NewGormLock creates a new GORM-based distributed lock instance +func NewGormLock(pool *dbcommon.ConnectionPool) Lock { + return &GormLock{ + pool: pool, + owner: uuid.New().String(), + } +} + +// Lock acquires a lock with the specified key and TTL +// It blocks until the lock is acquired or context is cancelled +func (g *GormLock) Lock(ctx context.Context, key string, ttl time.Duration) error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + acquired, err := g.TryLock(ctx, key, ttl) + if err != nil { + return fmt.Errorf("failed to try lock: %w", err) + } + if acquired { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +// TryLock attempts to acquire a lock without blocking +// Returns true if lock was acquired, false otherwise +func (g *GormLock) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error) { + db := g.pool.GetDB().WithContext(ctx) + expireAt := time.Now().Add(ttl) + + var acquired bool + err := db.Transaction(func(tx *gorm.DB) error { + // Clean up only this key's expired lock to improve performance + now := time.Now() + if err := tx.Where("lock_key = ? AND expire_at < ?", key, now). + Delete(&LockRecord{}).Error; err != nil { + return fmt.Errorf("failed to clean expired lock for key %s: %w", key, err) + } + + // Try to acquire lock using INSERT ... ON CONFLICT + lock := &LockRecord{ + LockKey: key, + Owner: g.owner, + ExpireAt: expireAt, + } + + // Try to insert the lock record + result := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "lock_key"}}, + DoNothing: true, // If conflict, do nothing + }).Create(lock) + + if result.Error != nil { + return fmt.Errorf("failed to insert lock record: %w", result.Error) + } + + // Check if we got the lock by verifying the owner + var existingLock LockRecord + if err := tx.Where("lock_key = ? ", key).First(&existingLock).Error; err != nil { + return fmt.Errorf("failed to verify lock ownership: %w", err) + } + + // Determine if we acquired the lock + acquired = existingLock.Owner == g.owner Review Comment: There's a potential race condition here. After inserting with ON CONFLICT DO NOTHING (line 97-100), another process could have acquired the lock, but before you verify ownership with the SELECT query (line 108), that other process could release the lock and a third process could acquire it. This would cause incorrect lock ownership detection. Consider using a SELECT ... FOR UPDATE or checking if result.RowsAffected > 0 from the INSERT to determine if you acquired the lock (RowsAffected would be 0 if the conflict clause prevented the insert). ```suggestion // Determine if we acquired the lock by checking RowsAffected acquired = result.RowsAffected > 0 ``` ########## pkg/core/bootstrap/bootstrap.go: ########## @@ -42,27 +43,32 @@ func Bootstrap(appCtx context.Context, cfg app.AdminConfig) (runtime.Runtime, er if err := initResourceStore(cfg, builder); err != nil { return nil, err } - // 2. initialize discovery + // 2. initialize distributed lock + if err := initDistributedLock(builder); err != nil { + // Don't fail bootstrap if lock initialization fails + logger.Warnf("Failed to initialize distributed lock: %v", err) Review Comment: The distributed lock initialization failure is caught and logged as a warning without failing the bootstrap. However, the comment says "Don't fail bootstrap if lock initialization fails", but the actual code flow continues to initialize other components that may depend on the lock (like the services in pkg/console/service/). This could lead to subtle bugs where operations that require locking proceed without protection. Consider either (1) making lock initialization required and failing bootstrap if it fails, or (2) ensuring all consumers of the lock check for nil and handle the absence gracefully (which they do, but this should be documented). ```suggestion return nil, errors.Wrap(err, "failed to initialize distributed lock") ``` ########## pkg/core/lock/gorm_lock.go: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "gorm.io/gorm" + "gorm.io/gorm/clause" + + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +// Ensure GormLock implements Lock interface +var _ Lock = (*GormLock)(nil) + +// GormLock provides distributed locking using database as backend +// It uses GORM for database operations and supports MySQL, PostgreSQL, etc. +type GormLock struct { + pool *dbcommon.ConnectionPool + owner string // Unique identifier for this lock instance +} + +// NewGormLock creates a new GORM-based distributed lock instance +func NewGormLock(pool *dbcommon.ConnectionPool) Lock { + return &GormLock{ + pool: pool, + owner: uuid.New().String(), + } +} + +// Lock acquires a lock with the specified key and TTL +// It blocks until the lock is acquired or context is cancelled +func (g *GormLock) Lock(ctx context.Context, key string, ttl time.Duration) error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + acquired, err := g.TryLock(ctx, key, ttl) + if err != nil { + return fmt.Errorf("failed to try lock: %w", err) + } + if acquired { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +// TryLock attempts to acquire a lock without blocking +// Returns true if lock was acquired, false otherwise +func (g *GormLock) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error) { + db := g.pool.GetDB().WithContext(ctx) + expireAt := time.Now().Add(ttl) + + var acquired bool + err := db.Transaction(func(tx *gorm.DB) error { + // Clean up only this key's expired lock to improve performance + now := time.Now() + if err := tx.Where("lock_key = ? AND expire_at < ?", key, now). + Delete(&LockRecord{}).Error; err != nil { + return fmt.Errorf("failed to clean expired lock for key %s: %w", key, err) + } + + // Try to acquire lock using INSERT ... ON CONFLICT + lock := &LockRecord{ + LockKey: key, + Owner: g.owner, + ExpireAt: expireAt, + } + + // Try to insert the lock record + result := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "lock_key"}}, + DoNothing: true, // If conflict, do nothing + }).Create(lock) + + if result.Error != nil { + return fmt.Errorf("failed to insert lock record: %w", result.Error) + } + + // Check if we got the lock by verifying the owner + var existingLock LockRecord + if err := tx.Where("lock_key = ? ", key).First(&existingLock).Error; err != nil { Review Comment: Extra space in the WHERE clause ("lock_key = ? " has a trailing space before the closing quote). While this doesn't affect functionality, it's inconsistent with other queries in the file. ```suggestion if err := tx.Where("lock_key = ?", key).First(&existingLock).Error; err != nil { ``` ########## pkg/console/service/configurator_rule.go: ########## @@ -54,9 +85,19 @@ func CreateConfigurator(ctx consolectx.Context, name string, res *meshresource.D } func DeleteConfigurator(ctx consolectx.Context, name string, mesh string) error { - if err := ctx.ResourceManager().DeleteByKey(meshresource.DynamicConfigKind, coremodel.BuildResourceKey(mesh, name)); err != nil { - logger.Warnf("delete %s configurator failed with error: %s", name, err.Error()) - return err + lock := ctx.LockManager() + if lock == nil { + return ctx.ResourceManager().DeleteByKey(meshresource.DynamicConfigKind, coremodel.BuildResourceKey(mesh, name)) } - return nil + + lockKey := fmt.Sprintf("dynamic_config:%s:%s", mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + if err := ctx.ResourceManager().DeleteByKey(meshresource.DynamicConfigKind, coremodel.BuildResourceKey(mesh, name)); err != nil { + logger.Warnf("delete %s configurator failed with error: %s", name, err.Error()) Review Comment: Extra space before %s - there are two spaces instead of one. ```suggestion logger.Warnf("delete %s configurator failed with error: %s", name, err.Error()) ``` ########## pkg/console/service/tag_rule.go: ########## @@ -38,6 +41,20 @@ func GetTagRule(ctx consolectx.Context, name string, mesh string) (*meshresource } func UpdateTagRule(ctx consolectx.Context, res *meshresource.TagRouteResource) error { + lock := ctx.LockManager() + if lock == nil { + return updateTagRuleUnsafe(ctx, res) + } + + lockKey := fmt.Sprintf("tag_route:%s:%s", res.Mesh, res.Name) Review Comment: The lock key generation pattern `fmt.Sprintf("tag_route:%s:%s", res.Mesh, res.Name)` is duplicated across UpdateTagRule, CreateTagRule, and DeleteTagRule functions. Consider extracting this to a helper function like `getTagRouteLockKey(mesh, name string)` to maintain consistency and make it easier to change the lock key pattern in the future. The same applies to other service files with similar patterns (configurator_rule.go and condition_rule.go). ########## pkg/console/service/condition_rule.go: ########## @@ -73,14 +76,46 @@ func GetConditionRule(ctx context.Context, name string, mesh string) (*meshresou } func UpdateConditionRule(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error { + lock := ctx.LockManager() + if lock == nil { + // Lock not available, proceed without lock protection + return updateConditionRuleUnsafe(ctx, name, res) + } + + // Use distributed lock to prevent concurrent modifications + lockKey := fmt.Sprintf("condition_route:%s:%s", res.Mesh, name) + lockTimeout := 30 * time.Second + + return lock.WithLock(ctx.AppContext(), lockKey, lockTimeout, func() error { + return updateConditionRuleUnsafe(ctx, name, res) + }) +} + +func updateConditionRuleUnsafe(ctx context.Context, name string, res *meshresource.ConditionRouteResource) error { if err := ctx.ResourceManager().Update(res); err != nil { logger.Warnf("update %s condition failed with error: %s", name, err.Error()) return err } + logger.Infof("Condition route %s updated successfully", name) Review Comment: The log message "Condition route %s updated successfully" is added in the new code, but there's no corresponding success log message in the original code or in the CreateConditionRule function. This inconsistency makes it harder to trace operations. Consider either adding similar log messages to other operations (create, delete) or removing this one to maintain consistency with the rest of the codebase. ########## pkg/core/lock/gorm_lock.go: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package lock + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "gorm.io/gorm" + "gorm.io/gorm/clause" + + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +// Ensure GormLock implements Lock interface +var _ Lock = (*GormLock)(nil) + +// GormLock provides distributed locking using database as backend +// It uses GORM for database operations and supports MySQL, PostgreSQL, etc. +type GormLock struct { + pool *dbcommon.ConnectionPool + owner string // Unique identifier for this lock instance +} + +// NewGormLock creates a new GORM-based distributed lock instance +func NewGormLock(pool *dbcommon.ConnectionPool) Lock { + return &GormLock{ + pool: pool, + owner: uuid.New().String(), + } +} + +// Lock acquires a lock with the specified key and TTL +// It blocks until the lock is acquired or context is cancelled +func (g *GormLock) Lock(ctx context.Context, key string, ttl time.Duration) error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + acquired, err := g.TryLock(ctx, key, ttl) + if err != nil { + return fmt.Errorf("failed to try lock: %w", err) + } + if acquired { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +// TryLock attempts to acquire a lock without blocking +// Returns true if lock was acquired, false otherwise +func (g *GormLock) TryLock(ctx context.Context, key string, ttl time.Duration) (bool, error) { + db := g.pool.GetDB().WithContext(ctx) + expireAt := time.Now().Add(ttl) + + var acquired bool + err := db.Transaction(func(tx *gorm.DB) error { + // Clean up only this key's expired lock to improve performance + now := time.Now() + if err := tx.Where("lock_key = ? AND expire_at < ?", key, now). + Delete(&LockRecord{}).Error; err != nil { + return fmt.Errorf("failed to clean expired lock for key %s: %w", key, err) + } + + // Try to acquire lock using INSERT ... ON CONFLICT + lock := &LockRecord{ + LockKey: key, + Owner: g.owner, + ExpireAt: expireAt, + } + + // Try to insert the lock record + result := tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "lock_key"}}, + DoNothing: true, // If conflict, do nothing + }).Create(lock) + + if result.Error != nil { + return fmt.Errorf("failed to insert lock record: %w", result.Error) + } + + // Check if we got the lock by verifying the owner + var existingLock LockRecord + if err := tx.Where("lock_key = ? ", key).First(&existingLock).Error; err != nil { + return fmt.Errorf("failed to verify lock ownership: %w", err) Review Comment: Extra space before %w - there are two spaces instead of one. ```suggestion return fmt.Errorf("failed to insert lock record: %w", result.Error) } // Check if we got the lock by verifying the owner var existingLock LockRecord if err := tx.Where("lock_key = ? ", key).First(&existingLock).Error; err != nil { return fmt.Errorf("failed to verify lock ownership: %w", err) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
