This is an automated email from the ASF dual-hosted git repository. DImuthuUpe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 523b191c770a000284f152baa641c6bef1e36352 Author: DImuthuUpe <[email protected]> AuthorDate: Sat May 16 20:25:56 2026 -0400 Implement service layer for compute clusters --- .../db/migrations/000002_compute_clusters.down.sql | 18 ++++ .../db/migrations/000002_compute_clusters.up.sql | 26 +++++ internal/server/server.go | 36 +++++++ internal/store/compute_cluster_store.go | 92 ++++++++++++++++ internal/store/store.go | 16 +++ pkg/models/allocation.go | 4 +- pkg/service/compute_cluster.go | 116 +++++++++++++++++++++ pkg/service/service.go | 22 ++-- 8 files changed, 318 insertions(+), 12 deletions(-) diff --git a/internal/db/migrations/000002_compute_clusters.down.sql b/internal/db/migrations/000002_compute_clusters.down.sql new file mode 100644 index 000000000..63df67edb --- /dev/null +++ b/internal/db/migrations/000002_compute_clusters.down.sql @@ -0,0 +1,18 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +DROP TABLE IF EXISTS compute_clusters; diff --git a/internal/db/migrations/000002_compute_clusters.up.sql b/internal/db/migrations/000002_compute_clusters.up.sql new file mode 100644 index 000000000..098ab19f7 --- /dev/null +++ b/internal/db/migrations/000002_compute_clusters.up.sql @@ -0,0 +1,26 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +CREATE TABLE IF NOT EXISTS compute_clusters +( + id VARCHAR(255) NOT NULL, + name VARCHAR(255) NOT NULL, + created_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + updated_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6), + PRIMARY KEY (id), + UNIQUE KEY uq_compute_clusters_name (name) +) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci; diff --git a/internal/server/server.go b/internal/server/server.go index bc9e9746a..47679bee4 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -59,6 +59,10 @@ func (s *Server) routes() { s.mux.HandleFunc("POST /projects", s.createProject) s.mux.HandleFunc("GET /projects/{id}", s.getProject) + + s.mux.HandleFunc("POST /compute-clusters", s.createComputeCluster) + s.mux.HandleFunc("GET /compute-clusters", s.listComputeClusters) + s.mux.HandleFunc("GET /compute-clusters/{id}", s.getComputeCluster) } func (s *Server) healthz(w http.ResponseWriter, _ *http.Request) { @@ -134,6 +138,38 @@ func (s *Server) getProject(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, p) } +func (s *Server) createComputeCluster(w http.ResponseWriter, r *http.Request) { + var c models.ComputeCluster + if err := decodeJSON(r, &c); err != nil { + writeError(w, http.StatusBadRequest, err) + return + } + created, err := s.svc.CreateComputeCluster(r.Context(), &c) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusCreated, created) +} + +func (s *Server) getComputeCluster(w http.ResponseWriter, r *http.Request) { + c, err := s.svc.GetComputeCluster(r.Context(), r.PathValue("id")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, c) +} + +func (s *Server) listComputeClusters(w http.ResponseWriter, r *http.Request) { + clusters, err := s.svc.ListComputeClusters(r.Context()) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, clusters) +} + // LoggingMiddleware logs every request once it completes. func LoggingMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/store/compute_cluster_store.go b/internal/store/compute_cluster_store.go new file mode 100644 index 000000000..c27b77bae --- /dev/null +++ b/internal/store/compute_cluster_store.go @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package store + +import ( + "context" + "database/sql" + "errors" + + "github.com/jmoiron/sqlx" + + "github.com/apache/airavata-custos/pkg/models" +) + +type mysqlComputeClusterStore struct { + db *sqlx.DB +} + +// NewComputeClusterStore returns a MySQL-backed ComputeClusterStore. +func NewComputeClusterStore(db *sqlx.DB) ComputeClusterStore { + return &mysqlComputeClusterStore{db: db} +} + +func (s *mysqlComputeClusterStore) FindByID(ctx context.Context, id string) (*models.ComputeCluster, error) { + var c models.ComputeCluster + err := s.db.GetContext(ctx, &c, + `SELECT id, name FROM compute_clusters WHERE id = ?`, id) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &c, nil +} + +func (s *mysqlComputeClusterStore) FindByName(ctx context.Context, name string) (*models.ComputeCluster, error) { + var c models.ComputeCluster + err := s.db.GetContext(ctx, &c, + `SELECT id, name FROM compute_clusters WHERE name = ?`, name) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &c, nil +} + +func (s *mysqlComputeClusterStore) List(ctx context.Context) ([]models.ComputeCluster, error) { + var clusters []models.ComputeCluster + err := s.db.SelectContext(ctx, &clusters, + `SELECT id, name FROM compute_clusters ORDER BY name`) + if err != nil { + return nil, err + } + return clusters, nil +} + +func (s *mysqlComputeClusterStore) Create(ctx context.Context, tx *sql.Tx, c *models.ComputeCluster) error { + _, err := tx.ExecContext(ctx, + `INSERT INTO compute_clusters (id, name) VALUES (?, ?)`, + c.ID, c.Name) + return err +} + +func (s *mysqlComputeClusterStore) Update(ctx context.Context, tx *sql.Tx, c *models.ComputeCluster) error { + _, err := tx.ExecContext(ctx, + `UPDATE compute_clusters SET name = ? WHERE id = ?`, + c.Name, c.ID) + return err +} + +func (s *mysqlComputeClusterStore) Delete(ctx context.Context, tx *sql.Tx, id string) error { + _, err := tx.ExecContext(ctx, `DELETE FROM compute_clusters WHERE id = ?`, id) + return err +} diff --git a/internal/store/store.go b/internal/store/store.go index f0b1e1206..c9a6525a8 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -54,6 +54,22 @@ type OrganizationStore interface { Delete(ctx context.Context, tx *sql.Tx, id string) error } +// ComputeClusterStore defines persistence operations for compute clusters. +type ComputeClusterStore interface { + // FindByID returns the cluster with the given ID, or nil if not found. + FindByID(ctx context.Context, id string) (*models.ComputeCluster, error) + // FindByName returns the cluster with the given name, or nil if not found. + FindByName(ctx context.Context, name string) (*models.ComputeCluster, error) + // List returns all compute clusters. + List(ctx context.Context) ([]models.ComputeCluster, error) + // Create inserts a new cluster within the provided transaction. + Create(ctx context.Context, tx *sql.Tx, c *models.ComputeCluster) error + // Update replaces mutable fields of an existing cluster within the provided transaction. + Update(ctx context.Context, tx *sql.Tx, c *models.ComputeCluster) error + // Delete removes a cluster by ID within the provided transaction. + Delete(ctx context.Context, tx *sql.Tx, id string) error +} + // ProjectStore defines persistence operations for projects. type ProjectStore interface { // FindByID returns the project with the given ID, or nil if not found. diff --git a/pkg/models/allocation.go b/pkg/models/allocation.go index f024dbdf2..e3c2c5d94 100644 --- a/pkg/models/allocation.go +++ b/pkg/models/allocation.go @@ -11,8 +11,8 @@ const ( ) type ComputeCluster struct { - ID string `json:"id"` - Name string `json:"name"` // A human-readable name for the compute cluster, e.g., "Cluster A", "Cluster B", etc. + ID string `json:"id" db:"id"` + Name string `json:"name" db:"name"` // A human-readable name for the compute cluster, e.g., "Cluster A", "Cluster B", etc. } type ComputeAllocation struct { diff --git a/pkg/service/compute_cluster.go b/pkg/service/compute_cluster.go new file mode 100644 index 000000000..106be4a21 --- /dev/null +++ b/pkg/service/compute_cluster.go @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package service + +import ( + "context" + "database/sql" + "fmt" + + "github.com/apache/airavata-custos/pkg/models" +) + +// CreateComputeCluster persists a new compute cluster. If cluster.ID is empty +// a new UUID is generated. The (possibly populated) cluster is returned. +func (s *Service) CreateComputeCluster(ctx context.Context, cluster *models.ComputeCluster) (*models.ComputeCluster, error) { + if cluster == nil { + return nil, fmt.Errorf("%w: compute cluster is nil", ErrInvalidInput) + } + if cluster.Name == "" { + return nil, fmt.Errorf("%w: compute cluster name is required", ErrInvalidInput) + } + if cluster.ID == "" { + cluster.ID = newID() + } + + if existing, err := s.clusters.FindByName(ctx, cluster.Name); err != nil { + return nil, fmt.Errorf("lookup compute cluster by name: %w", err) + } else if existing != nil { + return nil, fmt.Errorf("%w: compute cluster with name %q", ErrAlreadyExists, cluster.Name) + } + + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.clusters.Create(ctx, tx, cluster) + }); err != nil { + return nil, fmt.Errorf("create compute cluster: %w", err) + } + return cluster, nil +} + +// GetComputeCluster retrieves a compute cluster by its ID. Returns +// ErrNotFound when no cluster matches. +func (s *Service) GetComputeCluster(ctx context.Context, id string) (*models.ComputeCluster, error) { + c, err := s.clusters.FindByID(ctx, id) + if err != nil { + return nil, fmt.Errorf("get compute cluster: %w", err) + } + if c == nil { + return nil, ErrNotFound + } + return c, nil +} + +// GetComputeClusterByName retrieves a compute cluster by its name. +func (s *Service) GetComputeClusterByName(ctx context.Context, name string) (*models.ComputeCluster, error) { + c, err := s.clusters.FindByName(ctx, name) + if err != nil { + return nil, fmt.Errorf("get compute cluster by name: %w", err) + } + if c == nil { + return nil, ErrNotFound + } + return c, nil +} + +// ListComputeClusters returns every compute cluster, ordered by name. +func (s *Service) ListComputeClusters(ctx context.Context) ([]models.ComputeCluster, error) { + clusters, err := s.clusters.List(ctx) + if err != nil { + return nil, fmt.Errorf("list compute clusters: %w", err) + } + return clusters, nil +} + +// UpdateComputeCluster persists changes to an existing compute cluster. +func (s *Service) UpdateComputeCluster(ctx context.Context, cluster *models.ComputeCluster) error { + if cluster == nil || cluster.ID == "" { + return fmt.Errorf("%w: compute cluster id is required", ErrInvalidInput) + } + if cluster.Name == "" { + return fmt.Errorf("%w: compute cluster name is required", ErrInvalidInput) + } + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.clusters.Update(ctx, tx, cluster) + }); err != nil { + return fmt.Errorf("update compute cluster: %w", err) + } + return nil +} + +// DeleteComputeCluster removes a compute cluster by ID. +func (s *Service) DeleteComputeCluster(ctx context.Context, id string) error { + if id == "" { + return fmt.Errorf("%w: compute cluster id is required", ErrInvalidInput) + } + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.clusters.Delete(ctx, tx, id) + }); err != nil { + return fmt.Errorf("delete compute cluster: %w", err) + } + return nil +} diff --git a/pkg/service/service.go b/pkg/service/service.go index 7250e493f..b5926858e 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -33,28 +33,30 @@ import ( // mutating operation in a transaction so callers do not need to manage // *sql.Tx themselves. type Service struct { - db *sqlx.DB - orgs store.OrganizationStore - users store.UserStore - projs store.ProjectStore + db *sqlx.DB + orgs store.OrganizationStore + users store.UserStore + projs store.ProjectStore + clusters store.ComputeClusterStore } // New constructs a Service backed by the supplied database handle. // Stores are instantiated internally using the default MySQL implementations. func New(database *sqlx.DB) *Service { return &Service{ - db: database, - orgs: store.NewOrganizationStore(database), - users: store.NewUserStore(database), - projs: store.NewProjectStore(database), + db: database, + orgs: store.NewOrganizationStore(database), + users: store.NewUserStore(database), + projs: store.NewProjectStore(database), + clusters: store.NewComputeClusterStore(database), } } // NewWithStores constructs a Service from explicit stores. Useful for tests // within this module — stores are an internal type and cannot be supplied by // external callers. -func NewWithStores(database *sqlx.DB, orgs store.OrganizationStore, users store.UserStore, projs store.ProjectStore) *Service { - return &Service{db: database, orgs: orgs, users: users, projs: projs} +func NewWithStores(database *sqlx.DB, orgs store.OrganizationStore, users store.UserStore, projs store.ProjectStore, clusters store.ComputeClusterStore) *Service { + return &Service{db: database, orgs: orgs, users: users, projs: projs, clusters: clusters} } // inTx runs fn inside a database transaction managed by the Service.
