Copilot commented on code in PR #1360: URL: https://github.com/apache/dubbo-admin/pull/1360#discussion_r2573049418
########## pkg/store/dbcommon/gorm_store.go: ########## @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dbcommon + +import ( + "errors" + "fmt" + "reflect" + "sort" + + "gorm.io/gorm" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +// GormStore is a GORM-backed store implementation for Dubbo resources +// It uses GORM for database operations and maintains in-memory indices for fast lookups +// This implementation is database-agnostic and works with any GORM-supported database +type GormStore struct { + pool *ConnectionPool // Shared connection pool with reference counting + kind model.ResourceKind + address string + indices *Index // In-memory index with thread-safe operations + stopCh chan struct{} +} + +var _ store.ManagedResourceStore = &GormStore{} + +// NewGormStore creates a new GORM store for the specified resource kind +func NewGormStore(kind model.ResourceKind, address string, pool *ConnectionPool) *GormStore { + return &GormStore{ + kind: kind, + address: address, + pool: pool, + indices: NewIndex(), + stopCh: make(chan struct{}), + } +} + +// Init initializes the GORM store by migrating the schema +func (gs *GormStore) Init(_ runtime.BuilderContext) error { + // Perform table migration + db := gs.pool.GetDB() + modelForMigration := &ResourceModel{ResourceKind: gs.kind.ToString()} + if err := db.AutoMigrate(modelForMigration); err != nil { + return fmt.Errorf("failed to migrate schema for %s: %w", gs.kind.ToString(), err) + } + + logger.Infof("GORM store initialized for resource kind: %s", gs.kind.ToString()) + return nil +} + +// Start starts the GORM store and monitors for shutdown signal +func (gs *GormStore) Start(_ runtime.Runtime, stopCh <-chan struct{}) error { + logger.Infof("GORM store started for resource kind: %s", gs.kind.ToString()) + + // Monitor stop channel for graceful shutdown in a goroutine + go func() { + <-stopCh + logger.Infof("GORM store for %s received stop signal, initiating graceful shutdown", gs.kind.ToString()) + + // Close the internal stop channel to signal any ongoing operations + close(gs.stopCh) + + // Decrement the reference count and potentially close the connection pool + if gs.pool != nil { + if err := gs.pool.Close(); err != nil { + logger.Errorf("Failed to close connection pool for %s: %v", gs.kind.ToString(), err) + } else { + logger.Infof("GORM store for %s shutdown completed", gs.kind.ToString()) + } + } + }() + + return nil +} + +// Add inserts a new resource into the database +func (gs *GormStore) Add(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + var count int64 + db := gs.pool.GetDB() + err := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Count(&count).Error + if err != nil { + return err + } + if count > 0 { + return store.ErrorResourceAlreadyExists( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + if err := db.Create(m).Error; err != nil { + return err + } + + // Update indices after successful DB operation + gs.indices.UpdateResource(resource, nil) + + return nil +} + +// Update modifies an existing resource in the database +func (gs *GormStore) Update(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + // Get old resource for index update + oldResource, exists, err := gs.GetByKey(resource.ResourceKey()) + if err != nil { + return err + } + if !exists { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + db := gs.pool.GetDB() + result := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Updates(map[string]interface{}{ + "data": m.Data, + "updated_at": m.UpdatedAt, + }) + + if result.Error != nil { + return result.Error + } + + if result.RowsAffected == 0 { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + // Update indices: remove old and add new + gs.indices.UpdateResource(resource, oldResource.(model.Resource)) + + return nil +} + +// Delete removes a resource from the database +func (gs *GormStore) Delete(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + db := gs.pool.GetDB() + result := db.Where("resource_key = ?", resource.ResourceKey()). + Delete(&ResourceModel{}) + + if result.Error != nil { + return result.Error + } + + if result.RowsAffected == 0 { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + // Remove from indices + gs.indices.RemoveResource(resource) + + return nil +} + +// List returns all resources of the configured kind from the database +func (gs *GormStore) List() []interface{} { + var models []ResourceModel + db := gs.pool.GetDB() + if err := db.Find(&models).Error; err != nil { + logger.Errorf("failed to list resources: %v", err) + return []interface{}{} + } + + result := make([]interface{}, 0, len(models)) + for _, m := range models { + resource, err := m.ToResource() + if err != nil { + logger.Errorf("failed to deserialize resource: %v", err) + continue + } + result = append(result, resource) + } + return result +} + +// ListKeys returns all resource keys of the configured kind from the database +func (gs *GormStore) ListKeys() []string { + var keys []string + db := gs.pool.GetDB() + db.Model(&ResourceModel{}). + Pluck("resource_key", &keys) + return keys +} + +// Get retrieves a resource by its object reference +func (gs *GormStore) Get(obj interface{}) (item interface{}, exists bool, err error) { + resource, ok := obj.(model.Resource) + if !ok { + return nil, false, bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + return gs.GetByKey(resource.ResourceKey()) +} + +// GetByKey retrieves a resource by its unique key +func (gs *GormStore) GetByKey(key string) (item interface{}, exists bool, err error) { + var m ResourceModel + db := gs.pool.GetDB() + result := db.Where("resource_key = ?", key). + First(&m) + + if result.Error != nil { + if errors.Is(result.Error, gorm.ErrRecordNotFound) { + return nil, false, nil + } + return nil, false, result.Error + } + + resource, err := m.ToResource() + if err != nil { + return nil, false, err + } + + return resource, true, nil +} + +// Replace atomically replaces all resources in the database with the provided list +func (gs *GormStore) Replace(list []interface{}, _ string) error { + db := gs.pool.GetDB() + return db.Transaction(func(tx *gorm.DB) error { + // Delete all existing records for this resource kind + if err := tx.Delete(&ResourceModel{}, "1=1").Error; err != nil { + return err + } + + // Clear all indices + gs.clearIndices() + + // Return early if list is empty + if len(list) == 0 { + return nil + } + + // Convert all resources to ResourceModel + models := make([]*ResourceModel, 0, len(list)) + resources := make([]model.Resource, 0, len(list)) + for _, obj := range list { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + models = append(models, m) + resources = append(resources, resource) + } + + // Batch insert all models at once + if err := tx.CreateInBatches(models, 100).Error; err != nil { + return err + } + + // Rebuild indices for all resources + for _, resource := range resources { + gs.indices.UpdateResource(resource, nil) + } + + return nil + }) +} + +func (gs *GormStore) Resync() error { + return nil +} + +func (gs *GormStore) Index(indexName string, obj interface{}) ([]interface{}, error) { + if !gs.indices.IndexExists(indexName) { + return nil, fmt.Errorf("index %s does not exist", indexName) + } + + indexFunc := gs.indices.GetIndexers()[indexName] + indexValues, err := indexFunc(obj) + if err != nil { + return nil, err + } + + if len(indexValues) == 0 { + return []interface{}{}, nil + } + + return gs.findByIndex(indexName, indexValues[0]) +} + +func (gs *GormStore) IndexKeys(indexName, indexedValue string) ([]string, error) { + if !gs.indices.IndexExists(indexName) { + return nil, fmt.Errorf("index %s does not exist", indexName) + } + + resources, err := gs.findByIndex(indexName, indexedValue) + if err != nil { + return nil, err + } + + keys := make([]string, 0, len(resources)) + for _, obj := range resources { + if resource, ok := obj.(model.Resource); ok { + keys = append(keys, resource.ResourceKey()) + } + } + + return keys, nil +} + +func (gs *GormStore) ListIndexFuncValues(indexName string) []string { + if !gs.indices.IndexExists(indexName) { + return []string{} + } + + return gs.indices.ListIndexFuncValues(indexName) +} + +func (gs *GormStore) ByIndex(indexName, indexedValue string) ([]interface{}, error) { + if !gs.indices.IndexExists(indexName) { + return nil, fmt.Errorf("index %s does not exist", indexName) + } + + return gs.findByIndex(indexName, indexedValue) +} + +func (gs *GormStore) GetIndexers() cache.Indexers { + return gs.indices.GetIndexers() +} + +func (gs *GormStore) AddIndexers(newIndexers cache.Indexers) error { + return gs.indices.AddIndexers(newIndexers) +} + +func (gs *GormStore) GetByKeys(keys []string) ([]model.Resource, error) { + if len(keys) == 0 { + return []model.Resource{}, nil + } + + var models []ResourceModel + db := gs.pool.GetDB() + err := db.Where("resource_key IN ?", keys). + Find(&models).Error + if err != nil { + return nil, err + } + + resources := make([]model.Resource, 0, len(models)) + for _, m := range models { + resource, err := m.ToResource() + if err != nil { + return nil, err + } + resources = append(resources, resource) + } + + return resources, nil +} + +func (gs *GormStore) ListByIndexes(indexes map[string]string) ([]model.Resource, error) { + keys, err := gs.getKeysByIndexes(indexes) + if err != nil { + return nil, err + } + + resources, err := gs.GetByKeys(keys) + if err != nil { + return nil, err + } + + sort.Slice(resources, func(i, j int) bool { + return resources[i].ResourceKey() < resources[j].ResourceKey() + }) + + return resources, nil +} + +func (gs *GormStore) PageListByIndexes(indexes map[string]string, pq model.PageReq) (*model.PageData[model.Resource], error) { + keys, err := gs.getKeysByIndexes(indexes) + if err != nil { + return nil, err + } + + sort.Strings(keys) + total := len(keys) + + if pq.PageOffset >= total { + return model.NewPageData(total, pq.PageOffset, pq.PageSize, []model.Resource{}), nil + } + + end := pq.PageOffset + pq.PageSize + if end > total { + end = total + } + + pageKeys := keys[pq.PageOffset:end] + resources, err := gs.GetByKeys(pageKeys) + if err != nil { + return nil, err + } + + return model.NewPageData(total, pq.PageOffset, pq.PageSize, resources), nil +} + +func (gs *GormStore) findByIndex(indexName, indexedValue string) ([]interface{}, error) { + if !gs.indices.IndexExists(indexName) { + return nil, fmt.Errorf("index %s does not exist", indexName) + } + + // Get resource keys from in-memory index + keys := gs.indices.GetKeys(indexName, indexedValue) + + if len(keys) == 0 { + return []interface{}{}, nil + } + + // Fetch resources from DB by keys + resources, err := gs.GetByKeys(keys) + if err != nil { + return nil, err + } + + // Convert to []interface{} + result := make([]interface{}, len(resources)) + for i, resource := range resources { + result[i] = resource + } + + return result, nil +} + +func (gs *GormStore) getKeysByIndexes(indexes map[string]string) ([]string, error) { + if len(indexes) == 0 { + return gs.ListKeys(), nil + } + + var keySet map[string]struct{} + first := true + + for indexName, indexValue := range indexes { + keys, err := gs.IndexKeys(indexName, indexValue) + if err != nil { + return nil, err + } + + if first { + keySet = make(map[string]struct{}, len(keys)) + for _, key := range keys { + keySet[key] = struct{}{} + } + first = false + } else { + nextSet := make(map[string]struct{}, len(keys)) + for _, key := range keys { + if _, exists := keySet[key]; exists { + nextSet[key] = struct{}{} + } + } + keySet = nextSet + } + } + + result := make([]string, 0, len(keySet)) + for key := range keySet { + result = append(result, key) + } + + return result, nil +} + +// clearIndices clears all in-memory indices +func (gs *GormStore) clearIndices() { + gs.indices.Clear() +} Review Comment: The core database store logic (GormStore) lacks test coverage. Since the memory store has comprehensive tests (see `pkg/store/memory/store_test.go`), similar test coverage should be added for GormStore to verify CRUD operations, index management, connection pooling, transaction handling, and error cases. ########## pkg/store/dbcommon/connection_pool.go: ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dbcommon + +import ( + "database/sql" + "fmt" + "sync" + "time" + + "gorm.io/gorm" + + storecfg "github.com/apache/dubbo-admin/pkg/config/store" + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +var ( + // pools stores all connection pools indexed by a unique key (storeType:address) + pools = make(map[string]*ConnectionPool) + // poolsMutex protects concurrent access to the pools map + poolsMutex sync.RWMutex +) + +// ConnectionPoolConfig defines connection pool configuration +type ConnectionPoolConfig struct { + MaxIdleConns int // Maximum number of idle connections + MaxOpenConns int // Maximum number of open connections + ConnMaxLifetime time.Duration // Maximum lifetime of a connection + ConnMaxIdleTime time.Duration // Maximum idle time of a connection +} + +// ConnectionPool manages database connections with connection pooling +type ConnectionPool struct { + db *gorm.DB + sqlDB *sql.DB + address string + storeType storecfg.Type + mu sync.RWMutex + refCount int // Reference counter for the number of stores using this pool + closeOnce sync.Once // Ensure Close is called only once + closed bool // Track if the pool is closed +} + +// GetOrCreatePool returns or creates a connection pool for the given store type and address +// It implements a singleton pattern with reference counting to allow pool reuse across multiple stores +// If a pool already exists for the same storeType and address, it increments the reference count and returns the existing pool +// Otherwise, it creates a new pool with the provided dialector +func GetOrCreatePool(dialector gorm.Dialector, storeType storecfg.Type, address string, config *ConnectionPoolConfig) (*ConnectionPool, error) { + if storeType == storecfg.Memory { + return nil, fmt.Errorf("memory pool store is no need to create connection pool") + } + + poolKey := fmt.Sprintf("%s:%s", storeType, address) + + poolsMutex.Lock() + defer poolsMutex.Unlock() + + // Check if pool already exists + if existingPool, exists := pools[poolKey]; exists { + // Increment reference count when reusing existing pool + existingPool.IncrementRef() + logger.Infof("Reusing %s connection pool: address=%s, refCount=%d", storeType, address, existingPool.RefCount()) + return existingPool, nil + } + + // Create new pool + if config == nil { + config = DefaultConnectionPoolConfig() + } + + pool, err := NewConnectionPool(dialector, storeType, address, config) + if err != nil { + return nil, err + } + + // Store the pool + pools[poolKey] = pool + logger.Infof("%s connection pool created successfully: address=%s, maxIdleConns=%d, maxOpenConns=%d", + storeType, address, config.MaxIdleConns, config.MaxOpenConns) + + return pool, nil +} + +// RemovePool removes a pool from the global registry +// This should only be called when the pool's reference count reaches zero +func RemovePool(storeType storecfg.Type, address string) { + poolKey := fmt.Sprintf("%s:%s", storeType, address) + + poolsMutex.Lock() + defer poolsMutex.Unlock() + + delete(pools, poolKey) + logger.Infof("Removed %s connection pool from registry: address=%s", storeType, address) +} + +// DefaultConnectionPoolConfig returns default connection pool configuration +func DefaultConnectionPoolConfig() *ConnectionPoolConfig { + return &ConnectionPoolConfig{ + MaxIdleConns: 10, // Default: 10 idle connections + MaxOpenConns: 100, // Default: 100 max open connections + ConnMaxLifetime: time.Hour, // Default: 1 hour max lifetime + ConnMaxIdleTime: 10 * time.Minute, // Default: 10 minutes max idle time + } +} + +// NewConnectionPool creates a new connection pool +func NewConnectionPool(dialector gorm.Dialector, storeType storecfg.Type, address string, config *ConnectionPoolConfig) (*ConnectionPool, error) { + db, err := gorm.Open(dialector, &gorm.Config{}) + if err != nil { + return nil, fmt.Errorf("failed to connect to %s: %w", storeType, err) + } + + sqlDB, err := db.DB() + if err != nil { + return nil, fmt.Errorf("failed to get underlying sql.DB: %w", err) + } + + // Configure connection pool + sqlDB.SetMaxIdleConns(config.MaxIdleConns) + sqlDB.SetMaxOpenConns(config.MaxOpenConns) + sqlDB.SetConnMaxLifetime(config.ConnMaxLifetime) + sqlDB.SetConnMaxIdleTime(config.ConnMaxIdleTime) + + return &ConnectionPool{ + db: db, + sqlDB: sqlDB, + address: address, + storeType: storeType, + refCount: 1, // Initial reference count + }, nil +} + +// GetDB returns the gorm.DB instance +func (p *ConnectionPool) GetDB() *gorm.DB { + p.mu.RLock() + defer p.mu.RUnlock() + return p.db +} + +// Address returns the connection address +func (p *ConnectionPool) Address() string { + p.mu.RLock() + defer p.mu.RUnlock() + return p.address +} + +// RefCount returns the current reference count +func (p *ConnectionPool) RefCount() int { + p.mu.RLock() + defer p.mu.RUnlock() + return p.refCount +} + +// IncrementRef increments the reference count +func (p *ConnectionPool) IncrementRef() { + p.mu.Lock() + defer p.mu.Unlock() + p.refCount++ +} + +// Close closes the connection pool gracefully with reference counting +// The pool is only actually closed when refCount reaches 0 +func (p *ConnectionPool) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.closed { + return nil // Already closed + } + + p.refCount-- + logger.Infof("Decremented %s connection pool refCount: address=%s, refCount=%d", p.storeType, p.address, p.refCount) + + // Only close the pool when no stores are using it + if p.refCount <= 0 { + var closeErr error + p.closeOnce.Do(func() { + if p.sqlDB != nil { + logger.Infof("Closing %s connection pool: address=%s", p.storeType, p.address) + closeErr = p.sqlDB.Close() + p.closed = true + } Review Comment: The `RemovePool` function is defined but never called. When `refCount` reaches 0 in `Close()`, the pool should be removed from the global `pools` map to prevent memory leaks. Add `RemovePool(p.storeType, p.address)` inside the `closeOnce.Do` block after closing the database connection. ```suggestion } RemovePool(p.storeType, p.address) ``` ########## pkg/store/dbcommon/gorm_store.go: ########## @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dbcommon + +import ( + "errors" + "fmt" + "reflect" + "sort" + + "gorm.io/gorm" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +// GormStore is a GORM-backed store implementation for Dubbo resources +// It uses GORM for database operations and maintains in-memory indices for fast lookups +// This implementation is database-agnostic and works with any GORM-supported database +type GormStore struct { + pool *ConnectionPool // Shared connection pool with reference counting + kind model.ResourceKind + address string + indices *Index // In-memory index with thread-safe operations + stopCh chan struct{} +} + +var _ store.ManagedResourceStore = &GormStore{} + +// NewGormStore creates a new GORM store for the specified resource kind +func NewGormStore(kind model.ResourceKind, address string, pool *ConnectionPool) *GormStore { + return &GormStore{ + kind: kind, + address: address, + pool: pool, + indices: NewIndex(), + stopCh: make(chan struct{}), + } +} + +// Init initializes the GORM store by migrating the schema +func (gs *GormStore) Init(_ runtime.BuilderContext) error { + // Perform table migration + db := gs.pool.GetDB() + modelForMigration := &ResourceModel{ResourceKind: gs.kind.ToString()} + if err := db.AutoMigrate(modelForMigration); err != nil { + return fmt.Errorf("failed to migrate schema for %s: %w", gs.kind.ToString(), err) + } + + logger.Infof("GORM store initialized for resource kind: %s", gs.kind.ToString()) + return nil +} + +// Start starts the GORM store and monitors for shutdown signal +func (gs *GormStore) Start(_ runtime.Runtime, stopCh <-chan struct{}) error { + logger.Infof("GORM store started for resource kind: %s", gs.kind.ToString()) + + // Monitor stop channel for graceful shutdown in a goroutine + go func() { + <-stopCh + logger.Infof("GORM store for %s received stop signal, initiating graceful shutdown", gs.kind.ToString()) + + // Close the internal stop channel to signal any ongoing operations + close(gs.stopCh) + + // Decrement the reference count and potentially close the connection pool + if gs.pool != nil { + if err := gs.pool.Close(); err != nil { + logger.Errorf("Failed to close connection pool for %s: %v", gs.kind.ToString(), err) + } else { + logger.Infof("GORM store for %s shutdown completed", gs.kind.ToString()) + } + } + }() + + return nil +} + +// Add inserts a new resource into the database +func (gs *GormStore) Add(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + var count int64 + db := gs.pool.GetDB() + err := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Count(&count).Error + if err != nil { + return err + } + if count > 0 { + return store.ErrorResourceAlreadyExists( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + if err := db.Create(m).Error; err != nil { + return err + } + + // Update indices after successful DB operation + gs.indices.UpdateResource(resource, nil) + + return nil +} + +// Update modifies an existing resource in the database +func (gs *GormStore) Update(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + // Get old resource for index update + oldResource, exists, err := gs.GetByKey(resource.ResourceKey()) + if err != nil { + return err + } + if !exists { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + db := gs.pool.GetDB() + result := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Updates(map[string]interface{}{ + "data": m.Data, + "updated_at": m.UpdatedAt, + }) + + if result.Error != nil { + return result.Error + } + + if result.RowsAffected == 0 { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + // Update indices: remove old and add new + gs.indices.UpdateResource(resource, oldResource.(model.Resource)) + + return nil +} + +// Delete removes a resource from the database +func (gs *GormStore) Delete(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + db := gs.pool.GetDB() + result := db.Where("resource_key = ?", resource.ResourceKey()). + Delete(&ResourceModel{}) + + if result.Error != nil { + return result.Error + } + + if result.RowsAffected == 0 { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + // Remove from indices + gs.indices.RemoveResource(resource) + + return nil +} + +// List returns all resources of the configured kind from the database +func (gs *GormStore) List() []interface{} { + var models []ResourceModel + db := gs.pool.GetDB() + if err := db.Find(&models).Error; err != nil { Review Comment: The `Find(&models)` operation queries without specifying which table to use. Since `models` is declared as `[]ResourceModel` (not with `ResourceKind` set), GORM will use the default `TableName()` which returns "resources". This should be changed to use a model instance with `ResourceKind` set, e.g., `db.Model(&ResourceModel{ResourceKind: gs.kind.ToString()}).Find(&models)` to query the correct resource-specific table. ```suggestion if err := db.Model(&ResourceModel{ResourceKind: gs.kind.ToString()}).Find(&models).Error; err != nil { ``` ########## pkg/store/dbcommon/connection_pool.go: ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dbcommon + +import ( + "database/sql" + "fmt" + "sync" + "time" + + "gorm.io/gorm" + + storecfg "github.com/apache/dubbo-admin/pkg/config/store" + "github.com/apache/dubbo-admin/pkg/core/logger" +) + +var ( + // pools stores all connection pools indexed by a unique key (storeType:address) + pools = make(map[string]*ConnectionPool) + // poolsMutex protects concurrent access to the pools map + poolsMutex sync.RWMutex +) + +// ConnectionPoolConfig defines connection pool configuration +type ConnectionPoolConfig struct { + MaxIdleConns int // Maximum number of idle connections + MaxOpenConns int // Maximum number of open connections + ConnMaxLifetime time.Duration // Maximum lifetime of a connection + ConnMaxIdleTime time.Duration // Maximum idle time of a connection +} + +// ConnectionPool manages database connections with connection pooling +type ConnectionPool struct { + db *gorm.DB + sqlDB *sql.DB + address string + storeType storecfg.Type + mu sync.RWMutex + refCount int // Reference counter for the number of stores using this pool + closeOnce sync.Once // Ensure Close is called only once + closed bool // Track if the pool is closed +} + +// GetOrCreatePool returns or creates a connection pool for the given store type and address +// It implements a singleton pattern with reference counting to allow pool reuse across multiple stores +// If a pool already exists for the same storeType and address, it increments the reference count and returns the existing pool +// Otherwise, it creates a new pool with the provided dialector +func GetOrCreatePool(dialector gorm.Dialector, storeType storecfg.Type, address string, config *ConnectionPoolConfig) (*ConnectionPool, error) { + if storeType == storecfg.Memory { + return nil, fmt.Errorf("memory pool store is no need to create connection pool") + } + + poolKey := fmt.Sprintf("%s:%s", storeType, address) + + poolsMutex.Lock() + defer poolsMutex.Unlock() + + // Check if pool already exists + if existingPool, exists := pools[poolKey]; exists { + // Increment reference count when reusing existing pool + existingPool.IncrementRef() + logger.Infof("Reusing %s connection pool: address=%s, refCount=%d", storeType, address, existingPool.RefCount()) Review Comment: Potential deadlock: `IncrementRef()` acquires `p.mu.Lock()` while being called from within `GetOrCreatePool()` which holds `poolsMutex.Lock()`. If another goroutine tries to call a method like `RefCount()` (which acquires `p.mu.RLock()`) and then needs `poolsMutex`, this could cause a deadlock. Consider incrementing `refCount` directly within the `GetOrCreatePool` function while holding `poolsMutex` instead of calling `IncrementRef()`. ```suggestion existingPool.refCount++ logger.Infof("Reusing %s connection pool: address=%s, refCount=%d", storeType, address, existingPool.refCount) ``` ########## pkg/store/dbcommon/gorm_store.go: ########## @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dbcommon + +import ( + "errors" + "fmt" + "reflect" + "sort" + + "gorm.io/gorm" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +// GormStore is a GORM-backed store implementation for Dubbo resources +// It uses GORM for database operations and maintains in-memory indices for fast lookups +// This implementation is database-agnostic and works with any GORM-supported database +type GormStore struct { + pool *ConnectionPool // Shared connection pool with reference counting + kind model.ResourceKind + address string + indices *Index // In-memory index with thread-safe operations + stopCh chan struct{} +} + +var _ store.ManagedResourceStore = &GormStore{} + +// NewGormStore creates a new GORM store for the specified resource kind +func NewGormStore(kind model.ResourceKind, address string, pool *ConnectionPool) *GormStore { + return &GormStore{ + kind: kind, + address: address, + pool: pool, + indices: NewIndex(), + stopCh: make(chan struct{}), + } +} + +// Init initializes the GORM store by migrating the schema +func (gs *GormStore) Init(_ runtime.BuilderContext) error { + // Perform table migration + db := gs.pool.GetDB() + modelForMigration := &ResourceModel{ResourceKind: gs.kind.ToString()} + if err := db.AutoMigrate(modelForMigration); err != nil { + return fmt.Errorf("failed to migrate schema for %s: %w", gs.kind.ToString(), err) + } + + logger.Infof("GORM store initialized for resource kind: %s", gs.kind.ToString()) + return nil +} + +// Start starts the GORM store and monitors for shutdown signal +func (gs *GormStore) Start(_ runtime.Runtime, stopCh <-chan struct{}) error { + logger.Infof("GORM store started for resource kind: %s", gs.kind.ToString()) + + // Monitor stop channel for graceful shutdown in a goroutine + go func() { + <-stopCh + logger.Infof("GORM store for %s received stop signal, initiating graceful shutdown", gs.kind.ToString()) + + // Close the internal stop channel to signal any ongoing operations + close(gs.stopCh) + + // Decrement the reference count and potentially close the connection pool + if gs.pool != nil { + if err := gs.pool.Close(); err != nil { + logger.Errorf("Failed to close connection pool for %s: %v", gs.kind.ToString(), err) + } else { + logger.Infof("GORM store for %s shutdown completed", gs.kind.ToString()) + } + } + }() + + return nil +} + +// Add inserts a new resource into the database +func (gs *GormStore) Add(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + var count int64 + db := gs.pool.GetDB() + err := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Count(&count).Error + if err != nil { + return err + } + if count > 0 { + return store.ErrorResourceAlreadyExists( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + if err := db.Create(m).Error; err != nil { + return err + } + + // Update indices after successful DB operation + gs.indices.UpdateResource(resource, nil) + + return nil +} + +// Update modifies an existing resource in the database +func (gs *GormStore) Update(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + // Get old resource for index update + oldResource, exists, err := gs.GetByKey(resource.ResourceKey()) + if err != nil { + return err + } + if !exists { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + db := gs.pool.GetDB() + result := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Updates(map[string]interface{}{ + "data": m.Data, + "updated_at": m.UpdatedAt, + }) + + if result.Error != nil { + return result.Error + } + + if result.RowsAffected == 0 { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + // Update indices: remove old and add new + gs.indices.UpdateResource(resource, oldResource.(model.Resource)) + + return nil +} + +// Delete removes a resource from the database +func (gs *GormStore) Delete(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + db := gs.pool.GetDB() + result := db.Where("resource_key = ?", resource.ResourceKey()). + Delete(&ResourceModel{}) + + if result.Error != nil { + return result.Error + } + + if result.RowsAffected == 0 { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + // Remove from indices + gs.indices.RemoveResource(resource) + + return nil +} + +// List returns all resources of the configured kind from the database +func (gs *GormStore) List() []interface{} { + var models []ResourceModel + db := gs.pool.GetDB() + if err := db.Find(&models).Error; err != nil { + logger.Errorf("failed to list resources: %v", err) + return []interface{}{} + } + + result := make([]interface{}, 0, len(models)) + for _, m := range models { + resource, err := m.ToResource() + if err != nil { + logger.Errorf("failed to deserialize resource: %v", err) + continue + } + result = append(result, resource) + } + return result +} + +// ListKeys returns all resource keys of the configured kind from the database +func (gs *GormStore) ListKeys() []string { + var keys []string + db := gs.pool.GetDB() + db.Model(&ResourceModel{}). + Pluck("resource_key", &keys) + return keys +} + +// Get retrieves a resource by its object reference +func (gs *GormStore) Get(obj interface{}) (item interface{}, exists bool, err error) { + resource, ok := obj.(model.Resource) + if !ok { + return nil, false, bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + return gs.GetByKey(resource.ResourceKey()) +} + +// GetByKey retrieves a resource by its unique key +func (gs *GormStore) GetByKey(key string) (item interface{}, exists bool, err error) { + var m ResourceModel + db := gs.pool.GetDB() + result := db.Where("resource_key = ?", key). + First(&m) + + if result.Error != nil { + if errors.Is(result.Error, gorm.ErrRecordNotFound) { + return nil, false, nil + } + return nil, false, result.Error + } + + resource, err := m.ToResource() + if err != nil { + return nil, false, err + } + + return resource, true, nil +} + +// Replace atomically replaces all resources in the database with the provided list +func (gs *GormStore) Replace(list []interface{}, _ string) error { + db := gs.pool.GetDB() + return db.Transaction(func(tx *gorm.DB) error { + // Delete all existing records for this resource kind + if err := tx.Delete(&ResourceModel{}, "1=1").Error; err != nil { Review Comment: GORM delete operation uses `&ResourceModel{}` without setting the `ResourceKind` field. This causes the delete to target the wrong table. Should be `&ResourceModel{ResourceKind: gs.kind.ToString()}` to ensure it targets the correct resource-specific table. ```suggestion if err := tx.Delete(&ResourceModel{ResourceKind: gs.kind.ToString()}, "1=1").Error; err != nil { ``` ########## pkg/store/dbcommon/gorm_store.go: ########## @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dbcommon + +import ( + "errors" + "fmt" + "reflect" + "sort" + + "gorm.io/gorm" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +// GormStore is a GORM-backed store implementation for Dubbo resources +// It uses GORM for database operations and maintains in-memory indices for fast lookups +// This implementation is database-agnostic and works with any GORM-supported database +type GormStore struct { + pool *ConnectionPool // Shared connection pool with reference counting + kind model.ResourceKind + address string + indices *Index // In-memory index with thread-safe operations + stopCh chan struct{} +} + +var _ store.ManagedResourceStore = &GormStore{} + +// NewGormStore creates a new GORM store for the specified resource kind +func NewGormStore(kind model.ResourceKind, address string, pool *ConnectionPool) *GormStore { + return &GormStore{ + kind: kind, + address: address, + pool: pool, + indices: NewIndex(), + stopCh: make(chan struct{}), + } +} + +// Init initializes the GORM store by migrating the schema +func (gs *GormStore) Init(_ runtime.BuilderContext) error { + // Perform table migration + db := gs.pool.GetDB() + modelForMigration := &ResourceModel{ResourceKind: gs.kind.ToString()} + if err := db.AutoMigrate(modelForMigration); err != nil { + return fmt.Errorf("failed to migrate schema for %s: %w", gs.kind.ToString(), err) + } + + logger.Infof("GORM store initialized for resource kind: %s", gs.kind.ToString()) + return nil +} + +// Start starts the GORM store and monitors for shutdown signal +func (gs *GormStore) Start(_ runtime.Runtime, stopCh <-chan struct{}) error { + logger.Infof("GORM store started for resource kind: %s", gs.kind.ToString()) + + // Monitor stop channel for graceful shutdown in a goroutine + go func() { + <-stopCh + logger.Infof("GORM store for %s received stop signal, initiating graceful shutdown", gs.kind.ToString()) + + // Close the internal stop channel to signal any ongoing operations + close(gs.stopCh) + + // Decrement the reference count and potentially close the connection pool + if gs.pool != nil { + if err := gs.pool.Close(); err != nil { + logger.Errorf("Failed to close connection pool for %s: %v", gs.kind.ToString(), err) + } else { + logger.Infof("GORM store for %s shutdown completed", gs.kind.ToString()) + } + } + }() + + return nil +} + +// Add inserts a new resource into the database +func (gs *GormStore) Add(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + var count int64 + db := gs.pool.GetDB() + err := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Count(&count).Error + if err != nil { + return err + } + if count > 0 { + return store.ErrorResourceAlreadyExists( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + if err := db.Create(m).Error; err != nil { + return err + } + + // Update indices after successful DB operation + gs.indices.UpdateResource(resource, nil) + + return nil +} + +// Update modifies an existing resource in the database +func (gs *GormStore) Update(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + // Get old resource for index update + oldResource, exists, err := gs.GetByKey(resource.ResourceKey()) + if err != nil { + return err + } + if !exists { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + db := gs.pool.GetDB() + result := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Updates(map[string]interface{}{ + "data": m.Data, + "updated_at": m.UpdatedAt, + }) + + if result.Error != nil { + return result.Error + } + + if result.RowsAffected == 0 { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + // Update indices: remove old and add new + gs.indices.UpdateResource(resource, oldResource.(model.Resource)) + + return nil +} + +// Delete removes a resource from the database +func (gs *GormStore) Delete(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + db := gs.pool.GetDB() + result := db.Where("resource_key = ?", resource.ResourceKey()). + Delete(&ResourceModel{}) + + if result.Error != nil { + return result.Error + } + + if result.RowsAffected == 0 { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + // Remove from indices + gs.indices.RemoveResource(resource) + + return nil +} + +// List returns all resources of the configured kind from the database +func (gs *GormStore) List() []interface{} { + var models []ResourceModel + db := gs.pool.GetDB() + if err := db.Find(&models).Error; err != nil { + logger.Errorf("failed to list resources: %v", err) + return []interface{}{} + } + + result := make([]interface{}, 0, len(models)) + for _, m := range models { + resource, err := m.ToResource() + if err != nil { + logger.Errorf("failed to deserialize resource: %v", err) + continue + } + result = append(result, resource) + } + return result +} + +// ListKeys returns all resource keys of the configured kind from the database +func (gs *GormStore) ListKeys() []string { + var keys []string + db := gs.pool.GetDB() + db.Model(&ResourceModel{}). + Pluck("resource_key", &keys) Review Comment: The `ListKeys()` function doesn't check for or handle errors from the `Pluck` operation. If the database query fails, it will silently return an empty slice. The error should be checked and logged, e.g., `if err := db.Model(...).Pluck("resource_key", &keys).Error; err != nil { logger.Errorf("failed to list keys: %v", err) }`. ```suggestion if err := db.Model(&ResourceModel{}). Pluck("resource_key", &keys).Error; err != nil { logger.Errorf("failed to list keys: %v", err) return []string{} } ``` ########## pkg/store/dbcommon/gorm_store.go: ########## @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dbcommon + +import ( + "errors" + "fmt" + "reflect" + "sort" + + "gorm.io/gorm" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +// GormStore is a GORM-backed store implementation for Dubbo resources +// It uses GORM for database operations and maintains in-memory indices for fast lookups +// This implementation is database-agnostic and works with any GORM-supported database +type GormStore struct { + pool *ConnectionPool // Shared connection pool with reference counting + kind model.ResourceKind + address string + indices *Index // In-memory index with thread-safe operations + stopCh chan struct{} +} + +var _ store.ManagedResourceStore = &GormStore{} + +// NewGormStore creates a new GORM store for the specified resource kind +func NewGormStore(kind model.ResourceKind, address string, pool *ConnectionPool) *GormStore { + return &GormStore{ + kind: kind, + address: address, + pool: pool, + indices: NewIndex(), + stopCh: make(chan struct{}), + } +} + +// Init initializes the GORM store by migrating the schema +func (gs *GormStore) Init(_ runtime.BuilderContext) error { + // Perform table migration + db := gs.pool.GetDB() + modelForMigration := &ResourceModel{ResourceKind: gs.kind.ToString()} + if err := db.AutoMigrate(modelForMigration); err != nil { + return fmt.Errorf("failed to migrate schema for %s: %w", gs.kind.ToString(), err) + } + + logger.Infof("GORM store initialized for resource kind: %s", gs.kind.ToString()) + return nil +} + +// Start starts the GORM store and monitors for shutdown signal +func (gs *GormStore) Start(_ runtime.Runtime, stopCh <-chan struct{}) error { + logger.Infof("GORM store started for resource kind: %s", gs.kind.ToString()) + + // Monitor stop channel for graceful shutdown in a goroutine + go func() { + <-stopCh + logger.Infof("GORM store for %s received stop signal, initiating graceful shutdown", gs.kind.ToString()) + + // Close the internal stop channel to signal any ongoing operations + close(gs.stopCh) + + // Decrement the reference count and potentially close the connection pool + if gs.pool != nil { + if err := gs.pool.Close(); err != nil { + logger.Errorf("Failed to close connection pool for %s: %v", gs.kind.ToString(), err) + } else { + logger.Infof("GORM store for %s shutdown completed", gs.kind.ToString()) + } + } + }() + + return nil +} + +// Add inserts a new resource into the database +func (gs *GormStore) Add(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + var count int64 + db := gs.pool.GetDB() + err := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Count(&count).Error + if err != nil { + return err + } + if count > 0 { + return store.ErrorResourceAlreadyExists( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + if err := db.Create(m).Error; err != nil { + return err + } + + // Update indices after successful DB operation + gs.indices.UpdateResource(resource, nil) + + return nil +} + +// Update modifies an existing resource in the database +func (gs *GormStore) Update(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + // Get old resource for index update + oldResource, exists, err := gs.GetByKey(resource.ResourceKey()) + if err != nil { + return err + } + if !exists { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + db := gs.pool.GetDB() + result := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Updates(map[string]interface{}{ + "data": m.Data, + "updated_at": m.UpdatedAt, + }) + + if result.Error != nil { + return result.Error + } + + if result.RowsAffected == 0 { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + // Update indices: remove old and add new + gs.indices.UpdateResource(resource, oldResource.(model.Resource)) + + return nil +} + +// Delete removes a resource from the database +func (gs *GormStore) Delete(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + db := gs.pool.GetDB() + result := db.Where("resource_key = ?", resource.ResourceKey()). + Delete(&ResourceModel{}) Review Comment: GORM delete operation uses `&ResourceModel{}` without setting the `ResourceKind` field. This causes the delete to target the wrong table. Should be `&ResourceModel{ResourceKind: gs.kind.ToString()}` to ensure it targets the correct resource-specific table. ```suggestion Delete(&ResourceModel{ResourceKind: gs.kind.ToString()}) ``` ########## pkg/store/dbcommon/gorm_store.go: ########## @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dbcommon + +import ( + "errors" + "fmt" + "reflect" + "sort" + + "gorm.io/gorm" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +// GormStore is a GORM-backed store implementation for Dubbo resources +// It uses GORM for database operations and maintains in-memory indices for fast lookups +// This implementation is database-agnostic and works with any GORM-supported database +type GormStore struct { + pool *ConnectionPool // Shared connection pool with reference counting + kind model.ResourceKind + address string + indices *Index // In-memory index with thread-safe operations + stopCh chan struct{} +} + +var _ store.ManagedResourceStore = &GormStore{} + +// NewGormStore creates a new GORM store for the specified resource kind +func NewGormStore(kind model.ResourceKind, address string, pool *ConnectionPool) *GormStore { + return &GormStore{ + kind: kind, + address: address, + pool: pool, + indices: NewIndex(), + stopCh: make(chan struct{}), + } +} + +// Init initializes the GORM store by migrating the schema +func (gs *GormStore) Init(_ runtime.BuilderContext) error { + // Perform table migration + db := gs.pool.GetDB() + modelForMigration := &ResourceModel{ResourceKind: gs.kind.ToString()} + if err := db.AutoMigrate(modelForMigration); err != nil { + return fmt.Errorf("failed to migrate schema for %s: %w", gs.kind.ToString(), err) + } + + logger.Infof("GORM store initialized for resource kind: %s", gs.kind.ToString()) + return nil +} + +// Start starts the GORM store and monitors for shutdown signal +func (gs *GormStore) Start(_ runtime.Runtime, stopCh <-chan struct{}) error { + logger.Infof("GORM store started for resource kind: %s", gs.kind.ToString()) + + // Monitor stop channel for graceful shutdown in a goroutine + go func() { + <-stopCh + logger.Infof("GORM store for %s received stop signal, initiating graceful shutdown", gs.kind.ToString()) + + // Close the internal stop channel to signal any ongoing operations + close(gs.stopCh) + + // Decrement the reference count and potentially close the connection pool + if gs.pool != nil { + if err := gs.pool.Close(); err != nil { + logger.Errorf("Failed to close connection pool for %s: %v", gs.kind.ToString(), err) + } else { + logger.Infof("GORM store for %s shutdown completed", gs.kind.ToString()) + } + } + }() + + return nil +} + +// Add inserts a new resource into the database +func (gs *GormStore) Add(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + var count int64 + db := gs.pool.GetDB() + err := db.Model(&ResourceModel{}). Review Comment: GORM operations are using `&ResourceModel{}` without setting the `ResourceKind` field. This means `TableName()` will return "resources" instead of the resource-specific table (e.g., "resources_application"). Operations like `db.Model(&ResourceModel{})` in `Add()`, `Update()`, `ListKeys()` and `Delete()` should use `&ResourceModel{ResourceKind: gs.kind.ToString()}` to ensure operations target the correct table. ```suggestion err := db.Model(&ResourceModel{ResourceKind: gs.kind.ToString()}). ``` ########## pkg/store/dbcommon/gorm_store.go: ########## @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dbcommon + +import ( + "errors" + "fmt" + "reflect" + "sort" + + "gorm.io/gorm" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +// GormStore is a GORM-backed store implementation for Dubbo resources +// It uses GORM for database operations and maintains in-memory indices for fast lookups +// This implementation is database-agnostic and works with any GORM-supported database +type GormStore struct { + pool *ConnectionPool // Shared connection pool with reference counting + kind model.ResourceKind + address string + indices *Index // In-memory index with thread-safe operations + stopCh chan struct{} +} + +var _ store.ManagedResourceStore = &GormStore{} + +// NewGormStore creates a new GORM store for the specified resource kind +func NewGormStore(kind model.ResourceKind, address string, pool *ConnectionPool) *GormStore { + return &GormStore{ + kind: kind, + address: address, + pool: pool, + indices: NewIndex(), + stopCh: make(chan struct{}), + } +} + +// Init initializes the GORM store by migrating the schema +func (gs *GormStore) Init(_ runtime.BuilderContext) error { + // Perform table migration + db := gs.pool.GetDB() + modelForMigration := &ResourceModel{ResourceKind: gs.kind.ToString()} + if err := db.AutoMigrate(modelForMigration); err != nil { + return fmt.Errorf("failed to migrate schema for %s: %w", gs.kind.ToString(), err) + } + + logger.Infof("GORM store initialized for resource kind: %s", gs.kind.ToString()) + return nil +} Review Comment: The in-memory indices are not populated during store initialization. When a GormStore starts with existing data in the database, the indices will be empty until resources are explicitly added/updated. Consider adding an index rebuild operation in `Init()` or `Start()` that loads all existing resources and populates the indices, similar to how `Replace()` rebuilds indices after batch insertion. ########## pkg/store/mysql/mysql.go: ########## @@ -17,4 +17,42 @@ package mysql -// TODO implement memory resource store, refer to GORM https://gorm.io/docs/ +import ( + "gorm.io/driver/mysql" + + storecfg "github.com/apache/dubbo-admin/pkg/config/store" + "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/store" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +func init() { + store.RegisterFactory(&mysqlStoreFactory{}) +} + +// mysqlStoreFactory is the factory for creating MySQL store instances +type mysqlStoreFactory struct{} + +var _ store.Factory = &mysqlStoreFactory{} + +// Support checks if this factory supports the given store type +func (f *mysqlStoreFactory) Support(s storecfg.Type) bool { + return s == storecfg.MySQL +} + +// New creates a new MySQL store instance for the specified resource kind +func (f *mysqlStoreFactory) New(kind model.ResourceKind, cfg *storecfg.Config) (store.ManagedResourceStore, error) { + // Get or create connection pool with MySQL dialector + pool, err := dbcommon.GetOrCreatePool( + mysql.Open(cfg.Address), + storecfg.MySQL, + cfg.Address, + dbcommon.DefaultConnectionPoolConfig(), + ) + if err != nil { + return nil, err + } + + // Create GormStore with the pool + return dbcommon.NewGormStore(kind, cfg.Address, pool), nil +} Review Comment: The MySQL store implementation lacks test coverage. Since the memory store has comprehensive tests (see `pkg/store/memory/store_test.go`), similar test coverage should be added for the MySQL store to ensure correctness of database operations, connection pooling, and resource management. ########## pkg/store/postgres/postgres.go: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package postgres + +import ( + "gorm.io/driver/postgres" + + storecfg "github.com/apache/dubbo-admin/pkg/config/store" + "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/store" + "github.com/apache/dubbo-admin/pkg/store/dbcommon" +) + +func init() { + store.RegisterFactory(&postgresStoreFactory{}) +} + +// postgresStoreFactory is the factory for creating PostgreSQL store instances +type postgresStoreFactory struct{} + +var _ store.Factory = &postgresStoreFactory{} + +// Support checks if this factory supports the given store type +func (f *postgresStoreFactory) Support(s storecfg.Type) bool { + return s == storecfg.Postgres +} + +// New creates a new PostgreSQL store instance for the specified resource kind +func (f *postgresStoreFactory) New(kind model.ResourceKind, cfg *storecfg.Config) (store.ManagedResourceStore, error) { + // Get or create connection pool with PostgreSQL dialector + pool, err := dbcommon.GetOrCreatePool( + postgres.Open(cfg.Address), + storecfg.Postgres, + cfg.Address, + dbcommon.DefaultConnectionPoolConfig(), + ) + if err != nil { + return nil, err + } + + // Create GormStore with the pool + return dbcommon.NewGormStore(kind, cfg.Address, pool), nil +} Review Comment: The PostgreSQL store implementation lacks test coverage. Since the memory store has comprehensive tests (see `pkg/store/memory/store_test.go`), similar test coverage should be added for the PostgreSQL store to ensure correctness of database operations, connection pooling, and resource management. ########## pkg/store/dbcommon/gorm_store.go: ########## @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dbcommon + +import ( + "errors" + "fmt" + "reflect" + "sort" + + "gorm.io/gorm" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +// GormStore is a GORM-backed store implementation for Dubbo resources +// It uses GORM for database operations and maintains in-memory indices for fast lookups +// This implementation is database-agnostic and works with any GORM-supported database +type GormStore struct { + pool *ConnectionPool // Shared connection pool with reference counting + kind model.ResourceKind + address string + indices *Index // In-memory index with thread-safe operations + stopCh chan struct{} +} + +var _ store.ManagedResourceStore = &GormStore{} + +// NewGormStore creates a new GORM store for the specified resource kind +func NewGormStore(kind model.ResourceKind, address string, pool *ConnectionPool) *GormStore { + return &GormStore{ + kind: kind, + address: address, + pool: pool, + indices: NewIndex(), + stopCh: make(chan struct{}), + } +} + +// Init initializes the GORM store by migrating the schema +func (gs *GormStore) Init(_ runtime.BuilderContext) error { + // Perform table migration + db := gs.pool.GetDB() + modelForMigration := &ResourceModel{ResourceKind: gs.kind.ToString()} + if err := db.AutoMigrate(modelForMigration); err != nil { + return fmt.Errorf("failed to migrate schema for %s: %w", gs.kind.ToString(), err) + } + + logger.Infof("GORM store initialized for resource kind: %s", gs.kind.ToString()) + return nil +} + +// Start starts the GORM store and monitors for shutdown signal +func (gs *GormStore) Start(_ runtime.Runtime, stopCh <-chan struct{}) error { + logger.Infof("GORM store started for resource kind: %s", gs.kind.ToString()) + + // Monitor stop channel for graceful shutdown in a goroutine + go func() { + <-stopCh + logger.Infof("GORM store for %s received stop signal, initiating graceful shutdown", gs.kind.ToString()) + + // Close the internal stop channel to signal any ongoing operations + close(gs.stopCh) + + // Decrement the reference count and potentially close the connection pool + if gs.pool != nil { + if err := gs.pool.Close(); err != nil { + logger.Errorf("Failed to close connection pool for %s: %v", gs.kind.ToString(), err) + } else { + logger.Infof("GORM store for %s shutdown completed", gs.kind.ToString()) + } + } + }() + + return nil +} + +// Add inserts a new resource into the database +func (gs *GormStore) Add(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + var count int64 + db := gs.pool.GetDB() + err := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Count(&count).Error + if err != nil { + return err + } + if count > 0 { + return store.ErrorResourceAlreadyExists( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + if err := db.Create(m).Error; err != nil { + return err + } + + // Update indices after successful DB operation + gs.indices.UpdateResource(resource, nil) + + return nil +} + +// Update modifies an existing resource in the database +func (gs *GormStore) Update(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + // Get old resource for index update + oldResource, exists, err := gs.GetByKey(resource.ResourceKey()) + if err != nil { + return err + } + if !exists { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + db := gs.pool.GetDB() + result := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Updates(map[string]interface{}{ + "data": m.Data, + "updated_at": m.UpdatedAt, + }) + + if result.Error != nil { + return result.Error + } + + if result.RowsAffected == 0 { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + // Update indices: remove old and add new + gs.indices.UpdateResource(resource, oldResource.(model.Resource)) + + return nil +} + +// Delete removes a resource from the database +func (gs *GormStore) Delete(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + db := gs.pool.GetDB() + result := db.Where("resource_key = ?", resource.ResourceKey()). + Delete(&ResourceModel{}) + + if result.Error != nil { + return result.Error + } + + if result.RowsAffected == 0 { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + // Remove from indices + gs.indices.RemoveResource(resource) + + return nil +} + +// List returns all resources of the configured kind from the database +func (gs *GormStore) List() []interface{} { + var models []ResourceModel + db := gs.pool.GetDB() + if err := db.Find(&models).Error; err != nil { + logger.Errorf("failed to list resources: %v", err) + return []interface{}{} + } + + result := make([]interface{}, 0, len(models)) + for _, m := range models { + resource, err := m.ToResource() + if err != nil { + logger.Errorf("failed to deserialize resource: %v", err) + continue + } + result = append(result, resource) + } + return result +} + +// ListKeys returns all resource keys of the configured kind from the database +func (gs *GormStore) ListKeys() []string { + var keys []string + db := gs.pool.GetDB() + db.Model(&ResourceModel{}). Review Comment: GORM operation uses `&ResourceModel{}` without setting the `ResourceKind` field. This causes the query to target the wrong table. Should be `&ResourceModel{ResourceKind: gs.kind.ToString()}` to ensure it targets the correct resource-specific table. ```suggestion db.Model(&ResourceModel{ResourceKind: gs.kind.ToString()}). ``` ########## pkg/store/dbcommon/gorm_store.go: ########## @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dbcommon + +import ( + "errors" + "fmt" + "reflect" + "sort" + + "gorm.io/gorm" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +// GormStore is a GORM-backed store implementation for Dubbo resources +// It uses GORM for database operations and maintains in-memory indices for fast lookups +// This implementation is database-agnostic and works with any GORM-supported database +type GormStore struct { + pool *ConnectionPool // Shared connection pool with reference counting + kind model.ResourceKind + address string + indices *Index // In-memory index with thread-safe operations + stopCh chan struct{} +} + +var _ store.ManagedResourceStore = &GormStore{} + +// NewGormStore creates a new GORM store for the specified resource kind +func NewGormStore(kind model.ResourceKind, address string, pool *ConnectionPool) *GormStore { + return &GormStore{ + kind: kind, + address: address, + pool: pool, + indices: NewIndex(), + stopCh: make(chan struct{}), + } +} + +// Init initializes the GORM store by migrating the schema +func (gs *GormStore) Init(_ runtime.BuilderContext) error { + // Perform table migration + db := gs.pool.GetDB() + modelForMigration := &ResourceModel{ResourceKind: gs.kind.ToString()} + if err := db.AutoMigrate(modelForMigration); err != nil { + return fmt.Errorf("failed to migrate schema for %s: %w", gs.kind.ToString(), err) + } + + logger.Infof("GORM store initialized for resource kind: %s", gs.kind.ToString()) + return nil +} + +// Start starts the GORM store and monitors for shutdown signal +func (gs *GormStore) Start(_ runtime.Runtime, stopCh <-chan struct{}) error { + logger.Infof("GORM store started for resource kind: %s", gs.kind.ToString()) + + // Monitor stop channel for graceful shutdown in a goroutine + go func() { + <-stopCh + logger.Infof("GORM store for %s received stop signal, initiating graceful shutdown", gs.kind.ToString()) + + // Close the internal stop channel to signal any ongoing operations + close(gs.stopCh) + + // Decrement the reference count and potentially close the connection pool + if gs.pool != nil { + if err := gs.pool.Close(); err != nil { + logger.Errorf("Failed to close connection pool for %s: %v", gs.kind.ToString(), err) + } else { + logger.Infof("GORM store for %s shutdown completed", gs.kind.ToString()) + } + } + }() + + return nil +} + +// Add inserts a new resource into the database +func (gs *GormStore) Add(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + var count int64 + db := gs.pool.GetDB() + err := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Count(&count).Error + if err != nil { + return err + } + if count > 0 { + return store.ErrorResourceAlreadyExists( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + if err := db.Create(m).Error; err != nil { + return err + } + + // Update indices after successful DB operation + gs.indices.UpdateResource(resource, nil) + + return nil +} + +// Update modifies an existing resource in the database +func (gs *GormStore) Update(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + // Get old resource for index update + oldResource, exists, err := gs.GetByKey(resource.ResourceKey()) + if err != nil { + return err + } + if !exists { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + db := gs.pool.GetDB() + result := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Updates(map[string]interface{}{ Review Comment: The `Update` operation only updates the `data` and `updated_at` fields but doesn't update `name` or `mesh` fields. If a resource's name or mesh changes, these indexed fields in the database won't be updated, causing the database indexes to become stale. Consider updating these fields as well: `Updates(map[string]interface{}{"name": m.Name, "mesh": m.Mesh, "data": m.Data})`. ```suggestion Updates(map[string]interface{}{ "name": m.Name, "mesh": m.Mesh, ``` ########## pkg/store/dbcommon/gorm_store.go: ########## @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dbcommon + +import ( + "errors" + "fmt" + "reflect" + "sort" + + "gorm.io/gorm" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +// GormStore is a GORM-backed store implementation for Dubbo resources +// It uses GORM for database operations and maintains in-memory indices for fast lookups +// This implementation is database-agnostic and works with any GORM-supported database +type GormStore struct { + pool *ConnectionPool // Shared connection pool with reference counting + kind model.ResourceKind + address string + indices *Index // In-memory index with thread-safe operations + stopCh chan struct{} +} + +var _ store.ManagedResourceStore = &GormStore{} + +// NewGormStore creates a new GORM store for the specified resource kind +func NewGormStore(kind model.ResourceKind, address string, pool *ConnectionPool) *GormStore { + return &GormStore{ + kind: kind, + address: address, + pool: pool, + indices: NewIndex(), + stopCh: make(chan struct{}), + } +} + +// Init initializes the GORM store by migrating the schema +func (gs *GormStore) Init(_ runtime.BuilderContext) error { + // Perform table migration + db := gs.pool.GetDB() + modelForMigration := &ResourceModel{ResourceKind: gs.kind.ToString()} + if err := db.AutoMigrate(modelForMigration); err != nil { + return fmt.Errorf("failed to migrate schema for %s: %w", gs.kind.ToString(), err) + } + + logger.Infof("GORM store initialized for resource kind: %s", gs.kind.ToString()) + return nil +} + +// Start starts the GORM store and monitors for shutdown signal +func (gs *GormStore) Start(_ runtime.Runtime, stopCh <-chan struct{}) error { + logger.Infof("GORM store started for resource kind: %s", gs.kind.ToString()) + + // Monitor stop channel for graceful shutdown in a goroutine + go func() { + <-stopCh + logger.Infof("GORM store for %s received stop signal, initiating graceful shutdown", gs.kind.ToString()) + + // Close the internal stop channel to signal any ongoing operations + close(gs.stopCh) + + // Decrement the reference count and potentially close the connection pool + if gs.pool != nil { + if err := gs.pool.Close(); err != nil { + logger.Errorf("Failed to close connection pool for %s: %v", gs.kind.ToString(), err) + } else { + logger.Infof("GORM store for %s shutdown completed", gs.kind.ToString()) + } + } + }() + + return nil +} + +// Add inserts a new resource into the database +func (gs *GormStore) Add(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + var count int64 + db := gs.pool.GetDB() + err := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Count(&count).Error + if err != nil { + return err + } + if count > 0 { + return store.ErrorResourceAlreadyExists( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + if err := db.Create(m).Error; err != nil { + return err + } + + // Update indices after successful DB operation + gs.indices.UpdateResource(resource, nil) + + return nil +} + +// Update modifies an existing resource in the database +func (gs *GormStore) Update(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + // Get old resource for index update + oldResource, exists, err := gs.GetByKey(resource.ResourceKey()) + if err != nil { + return err + } + if !exists { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + db := gs.pool.GetDB() + result := db.Model(&ResourceModel{}). Review Comment: GORM operation uses `&ResourceModel{}` without setting the `ResourceKind` field. This causes the query to target the wrong table. Should be `&ResourceModel{ResourceKind: gs.kind.ToString()}` to ensure it targets the correct resource-specific table. ```suggestion result := db.Model(&ResourceModel{ResourceKind: gs.kind.ToString()}). ``` ########## pkg/store/dbcommon/gorm_store.go: ########## @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dbcommon + +import ( + "errors" + "fmt" + "reflect" + "sort" + + "gorm.io/gorm" + "k8s.io/client-go/tools/cache" + + "github.com/apache/dubbo-admin/pkg/common/bizerror" + "github.com/apache/dubbo-admin/pkg/core/logger" + "github.com/apache/dubbo-admin/pkg/core/resource/model" + "github.com/apache/dubbo-admin/pkg/core/runtime" + "github.com/apache/dubbo-admin/pkg/core/store" +) + +// GormStore is a GORM-backed store implementation for Dubbo resources +// It uses GORM for database operations and maintains in-memory indices for fast lookups +// This implementation is database-agnostic and works with any GORM-supported database +type GormStore struct { + pool *ConnectionPool // Shared connection pool with reference counting + kind model.ResourceKind + address string + indices *Index // In-memory index with thread-safe operations + stopCh chan struct{} +} + +var _ store.ManagedResourceStore = &GormStore{} + +// NewGormStore creates a new GORM store for the specified resource kind +func NewGormStore(kind model.ResourceKind, address string, pool *ConnectionPool) *GormStore { + return &GormStore{ + kind: kind, + address: address, + pool: pool, + indices: NewIndex(), + stopCh: make(chan struct{}), + } +} + +// Init initializes the GORM store by migrating the schema +func (gs *GormStore) Init(_ runtime.BuilderContext) error { + // Perform table migration + db := gs.pool.GetDB() + modelForMigration := &ResourceModel{ResourceKind: gs.kind.ToString()} + if err := db.AutoMigrate(modelForMigration); err != nil { + return fmt.Errorf("failed to migrate schema for %s: %w", gs.kind.ToString(), err) + } + + logger.Infof("GORM store initialized for resource kind: %s", gs.kind.ToString()) + return nil +} + +// Start starts the GORM store and monitors for shutdown signal +func (gs *GormStore) Start(_ runtime.Runtime, stopCh <-chan struct{}) error { + logger.Infof("GORM store started for resource kind: %s", gs.kind.ToString()) + + // Monitor stop channel for graceful shutdown in a goroutine + go func() { + <-stopCh + logger.Infof("GORM store for %s received stop signal, initiating graceful shutdown", gs.kind.ToString()) + + // Close the internal stop channel to signal any ongoing operations + close(gs.stopCh) + + // Decrement the reference count and potentially close the connection pool + if gs.pool != nil { + if err := gs.pool.Close(); err != nil { + logger.Errorf("Failed to close connection pool for %s: %v", gs.kind.ToString(), err) + } else { + logger.Infof("GORM store for %s shutdown completed", gs.kind.ToString()) + } + } + }() + + return nil +} + +// Add inserts a new resource into the database +func (gs *GormStore) Add(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + var count int64 + db := gs.pool.GetDB() + err := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Count(&count).Error + if err != nil { + return err + } + if count > 0 { + return store.ErrorResourceAlreadyExists( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + if err := db.Create(m).Error; err != nil { + return err + } + + // Update indices after successful DB operation + gs.indices.UpdateResource(resource, nil) + + return nil +} + +// Update modifies an existing resource in the database +func (gs *GormStore) Update(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + if resource.ResourceKind() != gs.kind { + return fmt.Errorf("resource kind mismatch: expected %s, got %s", gs.kind, resource.ResourceKind()) + } + + // Get old resource for index update + oldResource, exists, err := gs.GetByKey(resource.ResourceKey()) + if err != nil { + return err + } + if !exists { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + + db := gs.pool.GetDB() + result := db.Model(&ResourceModel{}). + Where("resource_key = ?", resource.ResourceKey()). + Updates(map[string]interface{}{ + "data": m.Data, + "updated_at": m.UpdatedAt, + }) + + if result.Error != nil { + return result.Error + } + + if result.RowsAffected == 0 { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + // Update indices: remove old and add new + gs.indices.UpdateResource(resource, oldResource.(model.Resource)) + + return nil +} + +// Delete removes a resource from the database +func (gs *GormStore) Delete(obj interface{}) error { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + db := gs.pool.GetDB() + result := db.Where("resource_key = ?", resource.ResourceKey()). + Delete(&ResourceModel{}) + + if result.Error != nil { + return result.Error + } + + if result.RowsAffected == 0 { + return store.ErrorResourceNotFound( + resource.ResourceKind().ToString(), + resource.ResourceMeta().Name, + resource.MeshName(), + ) + } + + // Remove from indices + gs.indices.RemoveResource(resource) + + return nil +} + +// List returns all resources of the configured kind from the database +func (gs *GormStore) List() []interface{} { + var models []ResourceModel + db := gs.pool.GetDB() + if err := db.Find(&models).Error; err != nil { + logger.Errorf("failed to list resources: %v", err) + return []interface{}{} + } + + result := make([]interface{}, 0, len(models)) + for _, m := range models { + resource, err := m.ToResource() + if err != nil { + logger.Errorf("failed to deserialize resource: %v", err) + continue + } + result = append(result, resource) + } + return result +} + +// ListKeys returns all resource keys of the configured kind from the database +func (gs *GormStore) ListKeys() []string { + var keys []string + db := gs.pool.GetDB() + db.Model(&ResourceModel{}). + Pluck("resource_key", &keys) + return keys +} + +// Get retrieves a resource by its object reference +func (gs *GormStore) Get(obj interface{}) (item interface{}, exists bool, err error) { + resource, ok := obj.(model.Resource) + if !ok { + return nil, false, bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + return gs.GetByKey(resource.ResourceKey()) +} + +// GetByKey retrieves a resource by its unique key +func (gs *GormStore) GetByKey(key string) (item interface{}, exists bool, err error) { + var m ResourceModel + db := gs.pool.GetDB() + result := db.Where("resource_key = ?", key). + First(&m) + + if result.Error != nil { + if errors.Is(result.Error, gorm.ErrRecordNotFound) { + return nil, false, nil + } + return nil, false, result.Error + } + + resource, err := m.ToResource() + if err != nil { + return nil, false, err + } + + return resource, true, nil +} + +// Replace atomically replaces all resources in the database with the provided list +func (gs *GormStore) Replace(list []interface{}, _ string) error { + db := gs.pool.GetDB() + return db.Transaction(func(tx *gorm.DB) error { + // Delete all existing records for this resource kind + if err := tx.Delete(&ResourceModel{}, "1=1").Error; err != nil { + return err + } + + // Clear all indices + gs.clearIndices() + + // Return early if list is empty + if len(list) == 0 { + return nil + } + + // Convert all resources to ResourceModel + models := make([]*ResourceModel, 0, len(list)) + resources := make([]model.Resource, 0, len(list)) + for _, obj := range list { + resource, ok := obj.(model.Resource) + if !ok { + return bizerror.NewAssertionError("Resource", reflect.TypeOf(obj).Name()) + } + + m, err := FromResource(resource) + if err != nil { + return err + } + models = append(models, m) + resources = append(resources, resource) + } + + // Batch insert all models at once + if err := tx.CreateInBatches(models, 100).Error; err != nil { + return err + } + + // Rebuild indices for all resources + for _, resource := range resources { + gs.indices.UpdateResource(resource, nil) + } + + return nil + }) +} + +func (gs *GormStore) Resync() error { + return nil +} + +func (gs *GormStore) Index(indexName string, obj interface{}) ([]interface{}, error) { + if !gs.indices.IndexExists(indexName) { + return nil, fmt.Errorf("index %s does not exist", indexName) + } + + indexFunc := gs.indices.GetIndexers()[indexName] + indexValues, err := indexFunc(obj) + if err != nil { + return nil, err + } + + if len(indexValues) == 0 { + return []interface{}{}, nil + } + + return gs.findByIndex(indexName, indexValues[0]) +} + +func (gs *GormStore) IndexKeys(indexName, indexedValue string) ([]string, error) { + if !gs.indices.IndexExists(indexName) { + return nil, fmt.Errorf("index %s does not exist", indexName) + } + + resources, err := gs.findByIndex(indexName, indexedValue) + if err != nil { + return nil, err + } + + keys := make([]string, 0, len(resources)) + for _, obj := range resources { + if resource, ok := obj.(model.Resource); ok { + keys = append(keys, resource.ResourceKey()) + } + } + + return keys, nil +} + +func (gs *GormStore) ListIndexFuncValues(indexName string) []string { + if !gs.indices.IndexExists(indexName) { + return []string{} + } + + return gs.indices.ListIndexFuncValues(indexName) +} + +func (gs *GormStore) ByIndex(indexName, indexedValue string) ([]interface{}, error) { + if !gs.indices.IndexExists(indexName) { + return nil, fmt.Errorf("index %s does not exist", indexName) + } + + return gs.findByIndex(indexName, indexedValue) +} + +func (gs *GormStore) GetIndexers() cache.Indexers { + return gs.indices.GetIndexers() +} + +func (gs *GormStore) AddIndexers(newIndexers cache.Indexers) error { + return gs.indices.AddIndexers(newIndexers) +} + +func (gs *GormStore) GetByKeys(keys []string) ([]model.Resource, error) { + if len(keys) == 0 { + return []model.Resource{}, nil + } + + var models []ResourceModel + db := gs.pool.GetDB() + err := db.Where("resource_key IN ?", keys). Review Comment: The `Find(&models)` operation queries without specifying which table to use. Since `models` is declared as `[]ResourceModel` (not with `ResourceKind` set), GORM will use the default `TableName()` which returns "resources". This should be changed to use a model instance with `ResourceKind` set, e.g., `db.Model(&ResourceModel{ResourceKind: gs.kind.ToString()}).Where("resource_key IN ?", keys).Find(&models)` to query the correct resource-specific table. ```suggestion err := db.Model(&ResourceModel{ResourceKind: gs.kind.ToString()}).Where("resource_key IN ?", keys). ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
