This is an automated email from the ASF dual-hosted git repository. DImuthuUpe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 6b7e6daa6e18ca5da1233ae5bfc4af34f059eeb2 Author: DImuthuUpe <[email protected]> AuthorDate: Mon May 18 23:47:59 2026 -0400 Adding SLURM association when an allocation membership is created --- .../internal/subscribers/members.go | 64 +++++++ .../internal/subscribers/subscriber.go | 1 + docs/API-Docs.md | 195 ++++++++++++++++++++- .../000010_compute_cluster_users.down.sql | 18 ++ .../migrations/000010_compute_cluster_users.up.sql | 31 ++++ internal/server/server.go | 80 +++++++++ internal/store/compute_cluster_user_store.go | 116 ++++++++++++ internal/store/store.go | 19 ++ pkg/events/compute_cluster_user_subscribe.go | 64 +++++++ pkg/events/types.go | 7 + pkg/models/allocation.go | 7 + pkg/service/compute_cluster_user.go | 179 +++++++++++++++++++ pkg/service/service.go | 4 + 13 files changed, 784 insertions(+), 1 deletion(-) diff --git a/connectors/SLURM/Association-Mapper/internal/subscribers/members.go b/connectors/SLURM/Association-Mapper/internal/subscribers/members.go new file mode 100644 index 000000000..cd609d5af --- /dev/null +++ b/connectors/SLURM/Association-Mapper/internal/subscribers/members.go @@ -0,0 +1,64 @@ +package subscribers + +import ( + "log/slog" + + "context" + client "github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/operations" + "github.com/apache/airavata-custos/pkg/models" + "time" +) + +func (a *AssociationSubscriber) SubscribeToComputeAllocationMembershipCreation(membership models.ComputeAllocationMembership) { + slog.Info("Received compute allocation membership creation event", "membership", membership) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + allocation, err := a.coreService.GetComputeAllocation(ctx, membership.ComputeAllocationID) + if err != nil { + slog.Error("Failed to get compute allocation", "error", err) + return + } + + cluster, err := a.coreService.GetComputeCluster(ctx, allocation.ComputeClusterID) + if err != nil { + slog.Error("Failed to get compute cluster", "error", err) + return + } + + user, err := a.coreService.GetUser(ctx, membership.UserID) + if err != nil { + slog.Error("Failed to get user", "error", err) + return + } + + csu, err := a.coreService.GetComputeClusterUserByPair(ctx, cluster.ID, user.ID) // TODO: use this to get the local username for the association instead of assuming it's the same as the Airavata Custos username + if err != nil { + slog.Error("Failed to get compute cluster user by pair", "error", err) + return + } + + resources, err := a.coreService.ListResourcesForAllocation(ctx, allocation.ID) // TODO: use this to get the partition for the association instead of hardcoding it to "default" + if err != nil { + slog.Error("Failed to list resources for allocation", "error", err) + return + } + + if len(resources) == 0 { + slog.Warn("No resources found for allocation, defaulting to partition 'default'", "allocation_id", allocation.ID) + } + association := client.Association{ + Account: allocation.Name, + Cluster: cluster.Name, + User: csu.LocalUsername, + Partition: resources[0].Name, + } + + err = a.slurmClient.UpsertAssociation(association) + if err != nil { + slog.Error("Failed to upsert association", "error", err) + } else { + slog.Info("Successfully upserted association", "association", association) + } +} diff --git a/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go b/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go index ce5161fd7..658dcc073 100644 --- a/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go +++ b/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go @@ -22,4 +22,5 @@ func (a *AssociationSubscriber) RegisterSubscribers() { a.eventBus.SubscribeComputeAllocationCreated(a.SubscribeToComputeAllocationCreation) a.eventBus.SubscribeComputeAllocationDeleted(a.SubscribeToComputeAllocationDeletion) a.eventBus.SubscribeComputeAllocationUpdated(a.SubscribeToComputeAllocationUpdate) + a.eventBus.SubscribeComputeAllocationMembershipCreated(a.SubscribeToComputeAllocationMembershipCreation) } diff --git a/docs/API-Docs.md b/docs/API-Docs.md index 6931d0304..81aa716db 100644 --- a/docs/API-Docs.md +++ b/docs/API-Docs.md @@ -330,6 +330,159 @@ Retrieve a single compute cluster by its ID. --- +## Compute Cluster Users + +A compute-cluster user maps a Custos user to their local account +(`local_username`) on a specific compute cluster. Each `(compute_cluster_id, +user_id)` pair is unique. The mapping is removed automatically when either +the referenced compute cluster or user is deleted. + +### `POST /compute-cluster-users` + +Create a new compute-cluster user mapping. + +**Required fields:** `compute_cluster_id`, `user_id`, `local_username` +**Optional fields:** `id` (auto-generated if omitted) + +Both `compute_cluster_id` and `user_id` must reference existing records. + +**Request** + +```json +{ + "compute_cluster_id": "9b0a7f1c-2c5d-4e1b-9a0f-22e8a5c2dcb1", + "user_id": "5e2c7b3a-1d8f-4d2c-bf09-83a7c4d6e210", + "local_username": "jdoe" +} +``` + +**Response 201** + +```json +{ + "id": "0d72d3b1-6f1a-4a92-9c4e-1d7a4b5f9c2d", + "compute_cluster_id": "9b0a7f1c-2c5d-4e1b-9a0f-22e8a5c2dcb1", + "user_id": "5e2c7b3a-1d8f-4d2c-bf09-83a7c4d6e210", + "local_username": "jdoe" +} +``` + +**Errors** + +- `400` — `compute_cluster_id`, `user_id`, or `local_username` is missing, + or the referenced cluster/user does not exist. +- `409` — this user is already mapped on the given compute cluster. + +--- + +### `GET /compute-cluster-users/{id}` + +Retrieve a single compute-cluster user mapping by its ID. + +**Errors** + +- `404` — no mapping matches the supplied ID. + +--- + +### `PUT /compute-cluster-users/{id}` + +Replace mutable fields of an existing compute-cluster user mapping. The +path `{id}` overrides any `id` in the request body. + +**Required fields:** `compute_cluster_id`, `user_id`, `local_username` + +**Request** + +```json +{ + "compute_cluster_id": "9b0a7f1c-2c5d-4e1b-9a0f-22e8a5c2dcb1", + "user_id": "5e2c7b3a-1d8f-4d2c-bf09-83a7c4d6e210", + "local_username": "jane.doe" +} +``` + +**Errors** + +- `400` — required fields missing. + +--- + +### `DELETE /compute-cluster-users/{id}` + +Remove a compute-cluster user mapping. + +**Response 204** — empty body. + +**Errors** + +- `404` — no mapping matches the supplied ID. + +--- + +### `GET /compute-clusters/{id}/users` + +List every user mapping for the given compute cluster, ordered by +`local_username`. + +**Response 200** + +```json +[ + { + "id": "0d72d3b1-6f1a-4a92-9c4e-1d7a4b5f9c2d", + "compute_cluster_id": "9b0a7f1c-2c5d-4e1b-9a0f-22e8a5c2dcb1", + "user_id": "5e2c7b3a-1d8f-4d2c-bf09-83a7c4d6e210", + "local_username": "jdoe" + } +] +``` + +--- + +### `GET /compute-clusters/{id}/users/{userId}` + +Look up the single compute-cluster user mapping for the given +`(compute_cluster_id, user_id)` pair. + +**Response 200** + +```json +{ + "id": "0d72d3b1-6f1a-4a92-9c4e-1d7a4b5f9c2d", + "compute_cluster_id": "9b0a7f1c-2c5d-4e1b-9a0f-22e8a5c2dcb1", + "user_id": "5e2c7b3a-1d8f-4d2c-bf09-83a7c4d6e210", + "local_username": "jdoe" +} +``` + +**Errors** + +- `400` — `compute_cluster_id` or `user_id` is missing. +- `404` — no mapping exists for the given pair. + +--- + +### `GET /users/{id}/compute-cluster-users` + +List every cluster mapping held by the given Custos user, ordered by +`compute_cluster_id`. + +**Response 200** + +```json +[ + { + "id": "0d72d3b1-6f1a-4a92-9c4e-1d7a4b5f9c2d", + "compute_cluster_id": "9b0a7f1c-2c5d-4e1b-9a0f-22e8a5c2dcb1", + "user_id": "5e2c7b3a-1d8f-4d2c-bf09-83a7c4d6e210", + "local_username": "jdoe" + } +] +``` + +--- + ## Compute Allocations A compute allocation grants a project a budget of Service Units (SUs) on a @@ -1076,7 +1229,7 @@ ALLOC_ID=$(curl -s -X POST $BASE/compute-allocations \ RES_ID=$(curl -s -X POST $BASE/compute-allocation-resources \ -H 'Content-Type: application/json' \ - -d '{"name":"GPU B200","resource_type":"GPU","resource_amount":8}' | jq -r .id) + -d '{"name":"debug","resource_type":"CPU","resource_amount":24}' | jq -r .id) # Attach the resource to the allocation. curl -s -X POST $BASE/compute-allocations/$ALLOC_ID/resources \ @@ -1096,6 +1249,46 @@ curl -s -X POST $BASE/compute-allocation-resource-rates \ # Look up the currently-effective rate. curl -s $BASE/compute-allocation-resources/$RES_ID/rates/effective | jq +# Create a Custos user that will be granted access on the cluster. +CLUSTER_USER_ACCT_ID=$(curl -s -X POST $BASE/users \ + -H 'Content-Type: application/json' \ + -d "{\"organization_id\":\"$ORG_ID\",\"first_name\":\"Dimuthu\",\"last_name\":\"Wannipurage\",\"email\":\"[email protected]\"}" \ + | jq -r .id) + +# Map the Custos user to a local UNIX account on the cluster. +CLUSTER_USER_ID=$(curl -s -X POST $BASE/compute-cluster-users \ + -H 'Content-Type: application/json' \ + -d "{\"compute_cluster_id\":\"$CLUSTER_ID\",\"user_id\":\"$CLUSTER_USER_ACCT_ID\",\"local_username\":\"dimuthu\"}" \ + | jq -r .id) + +# List all user mappings on the cluster. +curl -s $BASE/compute-clusters/$CLUSTER_ID/users | jq + +# Grant the user a sub-allocation (membership) on the compute allocation. +MEMBERSHIP_ID=$(curl -s -X POST $BASE/compute-allocation-memberships \ + -H 'Content-Type: application/json' \ + -d "{ + \"compute_allocation_id\":\"$ALLOC_ID\", + \"user_id\":\"$CLUSTER_USER_ACCT_ID\", + \"allocation_amount\":50000, + \"start_time\":\"2026-04-01T00:00:00Z\", + \"end_time\":\"2026-06-30T23:59:59Z\", + \"membership_status\":\"ACTIVE\" + }" | jq -r .id) + +# List every member of the allocation. +curl -s $BASE/compute-allocations/$ALLOC_ID/memberships | jq + +# List every allocation this user is a member of. +curl -s $BASE/users/$CLUSTER_USER_ACCT_ID/compute-allocation-memberships | jq + +# Bump the user's SU sub-allocation. +curl -s -X PUT $BASE/compute-allocation-memberships/$MEMBERSHIP_ID/allocation-amount \ + -H 'Content-Type: application/json' \ + -d '{"allocation_amount":75000}' | jq + + + # Record a usage diff against the allocation. curl -s -X POST $BASE/compute-allocation-diffs \ -H 'Content-Type: application/json' \ diff --git a/internal/db/migrations/000010_compute_cluster_users.down.sql b/internal/db/migrations/000010_compute_cluster_users.down.sql new file mode 100644 index 000000000..c207cd7c1 --- /dev/null +++ b/internal/db/migrations/000010_compute_cluster_users.down.sql @@ -0,0 +1,18 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +DROP TABLE IF EXISTS compute_cluster_users; diff --git a/internal/db/migrations/000010_compute_cluster_users.up.sql b/internal/db/migrations/000010_compute_cluster_users.up.sql new file mode 100644 index 000000000..284536208 --- /dev/null +++ b/internal/db/migrations/000010_compute_cluster_users.up.sql @@ -0,0 +1,31 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +CREATE TABLE IF NOT EXISTS compute_cluster_users +( + id VARCHAR(255) NOT NULL, + compute_cluster_id VARCHAR(255) NOT NULL, + user_id VARCHAR(255) NOT NULL, + local_username VARCHAR(255) NOT NULL, + created_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + updated_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6), + PRIMARY KEY (id), + UNIQUE KEY uq_compute_cluster_users_pair (compute_cluster_id, user_id), + KEY idx_compute_cluster_users_user (user_id), + CONSTRAINT fk_compute_cluster_users_cluster FOREIGN KEY (compute_cluster_id) REFERENCES compute_clusters (id) ON DELETE CASCADE, + CONSTRAINT fk_compute_cluster_users_user FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE +) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci; diff --git a/internal/server/server.go b/internal/server/server.go index fb98b7320..4b3e20fd2 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -64,6 +64,14 @@ func (s *Server) routes() { s.mux.HandleFunc("GET /compute-clusters", s.listComputeClusters) s.mux.HandleFunc("GET /compute-clusters/{id}", s.getComputeCluster) + s.mux.HandleFunc("POST /compute-cluster-users", s.createComputeClusterUser) + s.mux.HandleFunc("GET /compute-cluster-users/{id}", s.getComputeClusterUser) + s.mux.HandleFunc("PUT /compute-cluster-users/{id}", s.updateComputeClusterUser) + s.mux.HandleFunc("DELETE /compute-cluster-users/{id}", s.deleteComputeClusterUser) + s.mux.HandleFunc("GET /compute-clusters/{id}/users", s.listComputeClusterUsersByCluster) + s.mux.HandleFunc("GET /compute-clusters/{id}/users/{userId}", s.getComputeClusterUserByPair) + s.mux.HandleFunc("GET /users/{id}/compute-cluster-users", s.listComputeClusterUsersByUser) + s.mux.HandleFunc("POST /compute-allocations", s.createComputeAllocation) s.mux.HandleFunc("GET /compute-allocations/{id}", s.getComputeAllocation) @@ -223,6 +231,78 @@ func (s *Server) listComputeClusters(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, clusters) } +func (s *Server) createComputeClusterUser(w http.ResponseWriter, r *http.Request) { + var cu models.ComputeClusterUser + if err := decodeJSON(r, &cu); err != nil { + writeError(w, http.StatusBadRequest, err) + return + } + created, err := s.svc.CreateComputeClusterUser(r.Context(), &cu) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusCreated, created) +} + +func (s *Server) getComputeClusterUser(w http.ResponseWriter, r *http.Request) { + cu, err := s.svc.GetComputeClusterUser(r.Context(), r.PathValue("id")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, cu) +} + +func (s *Server) updateComputeClusterUser(w http.ResponseWriter, r *http.Request) { + var cu models.ComputeClusterUser + if err := decodeJSON(r, &cu); err != nil { + writeError(w, http.StatusBadRequest, err) + return + } + cu.ID = r.PathValue("id") + if err := s.svc.UpdateComputeClusterUser(r.Context(), &cu); err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, &cu) +} + +func (s *Server) deleteComputeClusterUser(w http.ResponseWriter, r *http.Request) { + if err := s.svc.DeleteComputeClusterUser(r.Context(), r.PathValue("id")); err != nil { + writeServiceError(w, err) + return + } + w.WriteHeader(http.StatusNoContent) +} + +func (s *Server) listComputeClusterUsersByCluster(w http.ResponseWriter, r *http.Request) { + users, err := s.svc.ListComputeClusterUsersByCluster(r.Context(), r.PathValue("id")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, users) +} + +func (s *Server) getComputeClusterUserByPair(w http.ResponseWriter, r *http.Request) { + cu, err := s.svc.GetComputeClusterUserByPair(r.Context(), r.PathValue("id"), r.PathValue("userId")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, cu) +} + +func (s *Server) listComputeClusterUsersByUser(w http.ResponseWriter, r *http.Request) { + users, err := s.svc.ListComputeClusterUsersByUser(r.Context(), r.PathValue("id")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, users) +} + func (s *Server) createComputeAllocation(w http.ResponseWriter, r *http.Request) { var a models.ComputeAllocation if err := decodeJSON(r, &a); err != nil { diff --git a/internal/store/compute_cluster_user_store.go b/internal/store/compute_cluster_user_store.go new file mode 100644 index 000000000..f5926ad1e --- /dev/null +++ b/internal/store/compute_cluster_user_store.go @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package store + +import ( + "context" + "database/sql" + "errors" + + "github.com/jmoiron/sqlx" + + "github.com/apache/airavata-custos/pkg/models" +) + +type mysqlComputeClusterUserStore struct { + db *sqlx.DB +} + +// NewComputeClusterUserStore returns a MySQL-backed ComputeClusterUserStore. +func NewComputeClusterUserStore(db *sqlx.DB) ComputeClusterUserStore { + return &mysqlComputeClusterUserStore{db: db} +} + +func (s *mysqlComputeClusterUserStore) FindByID(ctx context.Context, id string) (*models.ComputeClusterUser, error) { + var c models.ComputeClusterUser + err := s.db.GetContext(ctx, &c, + `SELECT id, compute_cluster_id, user_id, local_username + FROM compute_cluster_users WHERE id = ?`, id) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &c, nil +} + +func (s *mysqlComputeClusterUserStore) FindByPair(ctx context.Context, clusterID, userID string) (*models.ComputeClusterUser, error) { + var c models.ComputeClusterUser + err := s.db.GetContext(ctx, &c, + `SELECT id, compute_cluster_id, user_id, local_username + FROM compute_cluster_users + WHERE compute_cluster_id = ? AND user_id = ?`, clusterID, userID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &c, nil +} + +func (s *mysqlComputeClusterUserStore) FindByCluster(ctx context.Context, clusterID string) ([]models.ComputeClusterUser, error) { + var users []models.ComputeClusterUser + err := s.db.SelectContext(ctx, &users, + `SELECT id, compute_cluster_id, user_id, local_username + FROM compute_cluster_users + WHERE compute_cluster_id = ? + ORDER BY local_username`, clusterID) + if err != nil { + return nil, err + } + return users, nil +} + +func (s *mysqlComputeClusterUserStore) FindByUser(ctx context.Context, userID string) ([]models.ComputeClusterUser, error) { + var users []models.ComputeClusterUser + err := s.db.SelectContext(ctx, &users, + `SELECT id, compute_cluster_id, user_id, local_username + FROM compute_cluster_users + WHERE user_id = ? + ORDER BY compute_cluster_id`, userID) + if err != nil { + return nil, err + } + return users, nil +} + +func (s *mysqlComputeClusterUserStore) Create(ctx context.Context, tx *sql.Tx, c *models.ComputeClusterUser) error { + _, err := tx.ExecContext(ctx, + `INSERT INTO compute_cluster_users (id, compute_cluster_id, user_id, local_username) + VALUES (?, ?, ?, ?)`, + c.ID, c.ComputeClusterID, c.UserID, c.LocalUsername) + return err +} + +func (s *mysqlComputeClusterUserStore) Update(ctx context.Context, tx *sql.Tx, c *models.ComputeClusterUser) error { + _, err := tx.ExecContext(ctx, + `UPDATE compute_cluster_users + SET compute_cluster_id = ?, + user_id = ?, + local_username = ? + WHERE id = ?`, + c.ComputeClusterID, c.UserID, c.LocalUsername, c.ID) + return err +} + +func (s *mysqlComputeClusterUserStore) Delete(ctx context.Context, tx *sql.Tx, id string) error { + _, err := tx.ExecContext(ctx, `DELETE FROM compute_cluster_users WHERE id = ?`, id) + return err +} diff --git a/internal/store/store.go b/internal/store/store.go index ba6faa9f0..4dddec180 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -71,6 +71,25 @@ type ComputeClusterStore interface { Delete(ctx context.Context, tx *sql.Tx, id string) error } +// ComputeClusterUserStore defines persistence operations for the mapping of +// a Custos user to their local account on a compute cluster. +type ComputeClusterUserStore interface { + // FindByID returns the mapping with the given ID, or nil if not found. + FindByID(ctx context.Context, id string) (*models.ComputeClusterUser, error) + // FindByPair returns the mapping for a (compute_cluster_id, user_id) pair, or nil if absent. + FindByPair(ctx context.Context, clusterID, userID string) (*models.ComputeClusterUser, error) + // FindByCluster returns every user mapping for the given compute cluster. + FindByCluster(ctx context.Context, clusterID string) ([]models.ComputeClusterUser, error) + // FindByUser returns every cluster mapping held by the given Custos user. + FindByUser(ctx context.Context, userID string) ([]models.ComputeClusterUser, error) + // Create inserts a new mapping within the provided transaction. + Create(ctx context.Context, tx *sql.Tx, c *models.ComputeClusterUser) error + // Update replaces mutable fields of an existing mapping within the provided transaction. + Update(ctx context.Context, tx *sql.Tx, c *models.ComputeClusterUser) error + // Delete removes a mapping by ID within the provided transaction. + Delete(ctx context.Context, tx *sql.Tx, id string) error +} + // ProjectStore defines persistence operations for projects. type ProjectStore interface { // FindByID returns the project with the given ID, or nil if not found. diff --git a/pkg/events/compute_cluster_user_subscribe.go b/pkg/events/compute_cluster_user_subscribe.go new file mode 100644 index 000000000..5ba7a11a7 --- /dev/null +++ b/pkg/events/compute_cluster_user_subscribe.go @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package events + +import ( + "log/slog" + + "github.com/apache/airavata-custos/pkg/models" +) + +// ComputeClusterUserHandler handles compute-cluster user lifecycle events +// with a typed payload. +type ComputeClusterUserHandler func(user models.ComputeClusterUser) + +// SubscribeComputeClusterUserCreated registers a typed handler invoked +// whenever a compute_cluster_user::create event is published. +func (b *Bus) SubscribeComputeClusterUserCreated(handler ComputeClusterUserHandler) { + b.subscribeComputeClusterUser(ComputeClusterUserCreateEvent, handler) +} + +// SubscribeComputeClusterUserUpdated registers a typed handler invoked +// whenever a compute_cluster_user::update event is published. +func (b *Bus) SubscribeComputeClusterUserUpdated(handler ComputeClusterUserHandler) { + b.subscribeComputeClusterUser(ComputeClusterUserUpdateEvent, handler) +} + +// SubscribeComputeClusterUserDeleted registers a typed handler invoked +// whenever a compute_cluster_user::delete event is published. +func (b *Bus) SubscribeComputeClusterUserDeleted(handler ComputeClusterUserHandler) { + b.subscribeComputeClusterUser(ComputeClusterUserDeleteEvent, handler) +} + +func (b *Bus) subscribeComputeClusterUser(topic EventType, handler ComputeClusterUserHandler) { + b.Subscribe(topic, func(event Event, value interface{}) { + switch u := value.(type) { + case models.ComputeClusterUser: + handler(u) + case *models.ComputeClusterUser: + if u != nil { + handler(*u) + } + default: + slog.Warn("compute cluster user event payload has unexpected type", + "type", event.Type, + "got", value, + ) + } + }) +} diff --git a/pkg/events/types.go b/pkg/events/types.go index 11c900e94..44758f0e4 100644 --- a/pkg/events/types.go +++ b/pkg/events/types.go @@ -54,6 +54,13 @@ const ( ComputeClusterDeleteEvent EventType = "compute_cluster::delete" ) +// ComputeClusterUser lifecycle message types. +const ( + ComputeClusterUserCreateEvent EventType = "compute_cluster_user::create" + ComputeClusterUserUpdateEvent EventType = "compute_cluster_user::update" + ComputeClusterUserDeleteEvent EventType = "compute_cluster_user::delete" +) + // ClusterAccount lifecycle message types. const ( ClusterAccountCreateEvent EventType = "cluster_account::create" diff --git a/pkg/models/allocation.go b/pkg/models/allocation.go index 1037b1d7f..7813dcc2a 100644 --- a/pkg/models/allocation.go +++ b/pkg/models/allocation.go @@ -15,6 +15,13 @@ type ComputeCluster struct { Name string `json:"name" db:"name"` // A human-readable name for the compute cluster, e.g., "Cluster A", "Cluster B", etc. } +type ComputeClusterUser struct { + ID string `json:"id" db:"id"` + ComputeClusterID string `json:"compute_cluster_id" db:"compute_cluster_id"` + UserID string `json:"user_id" db:"user_id"` + LocalUsername string `json:"local_username" db:"local_username"` // The username of the user on the compute cluster, which may be different from their Airavata Custos username. +} + type ComputeAllocation struct { ID string `json:"id" db:"id"` ProjectID string `json:"project_id" db:"project_id"` diff --git a/pkg/service/compute_cluster_user.go b/pkg/service/compute_cluster_user.go new file mode 100644 index 000000000..3e8ed9868 --- /dev/null +++ b/pkg/service/compute_cluster_user.go @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package service + +import ( + "context" + "database/sql" + "fmt" + + "github.com/apache/airavata-custos/pkg/events" + "github.com/apache/airavata-custos/pkg/models" +) + +// CreateComputeClusterUser persists a new compute-cluster user mapping. If +// the ID is empty, a UUID is generated. The (possibly populated) record is +// returned. +func (s *Service) CreateComputeClusterUser(ctx context.Context, cu *models.ComputeClusterUser) (*models.ComputeClusterUser, error) { + if cu == nil { + return nil, fmt.Errorf("%w: compute cluster user is nil", ErrInvalidInput) + } + if cu.ComputeClusterID == "" { + return nil, fmt.Errorf("%w: compute_cluster_id is required", ErrInvalidInput) + } + if cu.UserID == "" { + return nil, fmt.Errorf("%w: user_id is required", ErrInvalidInput) + } + if cu.LocalUsername == "" { + return nil, fmt.Errorf("%w: local_username is required", ErrInvalidInput) + } + if cu.ID == "" { + cu.ID = newID() + } + + if cluster, err := s.clusters.FindByID(ctx, cu.ComputeClusterID); err != nil { + return nil, fmt.Errorf("lookup compute cluster: %w", err) + } else if cluster == nil { + return nil, fmt.Errorf("%w: compute cluster %q not found", ErrInvalidInput, cu.ComputeClusterID) + } + if user, err := s.users.FindByID(ctx, cu.UserID); err != nil { + return nil, fmt.Errorf("lookup user: %w", err) + } else if user == nil { + return nil, fmt.Errorf("%w: user %q not found", ErrInvalidInput, cu.UserID) + } + + if existing, err := s.clusterUsers.FindByPair(ctx, cu.ComputeClusterID, cu.UserID); err != nil { + return nil, fmt.Errorf("lookup compute cluster user by pair: %w", err) + } else if existing != nil { + return nil, fmt.Errorf("%w: user %q is already mapped on cluster %q", + ErrAlreadyExists, cu.UserID, cu.ComputeClusterID) + } + + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.clusterUsers.Create(ctx, tx, cu) + }); err != nil { + return nil, fmt.Errorf("create compute cluster user: %w", err) + } + + s.eventBus.Publish(events.ComputeClusterUserCreateEvent, cu) + return cu, nil +} + +// GetComputeClusterUser retrieves a compute-cluster user by its ID. +func (s *Service) GetComputeClusterUser(ctx context.Context, id string) (*models.ComputeClusterUser, error) { + c, err := s.clusterUsers.FindByID(ctx, id) + if err != nil { + return nil, fmt.Errorf("get compute cluster user: %w", err) + } + if c == nil { + return nil, ErrNotFound + } + return c, nil +} + +// GetComputeClusterUserByPair retrieves the compute-cluster user mapping for +// the given (compute_cluster_id, user_id) pair. +func (s *Service) GetComputeClusterUserByPair(ctx context.Context, clusterID, userID string) (*models.ComputeClusterUser, error) { + if clusterID == "" { + return nil, fmt.Errorf("%w: compute_cluster_id is required", ErrInvalidInput) + } + if userID == "" { + return nil, fmt.Errorf("%w: user_id is required", ErrInvalidInput) + } + c, err := s.clusterUsers.FindByPair(ctx, clusterID, userID) + if err != nil { + return nil, fmt.Errorf("get compute cluster user by pair: %w", err) + } + if c == nil { + return nil, ErrNotFound + } + return c, nil +} + +// ListComputeClusterUsersByCluster returns every user mapping for the given +// compute cluster, ordered by local username. +func (s *Service) ListComputeClusterUsersByCluster(ctx context.Context, clusterID string) ([]models.ComputeClusterUser, error) { + if clusterID == "" { + return nil, fmt.Errorf("%w: compute_cluster_id is required", ErrInvalidInput) + } + users, err := s.clusterUsers.FindByCluster(ctx, clusterID) + if err != nil { + return nil, fmt.Errorf("list compute cluster users by cluster: %w", err) + } + return users, nil +} + +// ListComputeClusterUsersByUser returns every cluster mapping held by the +// given Custos user. +func (s *Service) ListComputeClusterUsersByUser(ctx context.Context, userID string) ([]models.ComputeClusterUser, error) { + if userID == "" { + return nil, fmt.Errorf("%w: user_id is required", ErrInvalidInput) + } + users, err := s.clusterUsers.FindByUser(ctx, userID) + if err != nil { + return nil, fmt.Errorf("list compute cluster users by user: %w", err) + } + return users, nil +} + +// UpdateComputeClusterUser persists changes to an existing compute-cluster +// user mapping. +func (s *Service) UpdateComputeClusterUser(ctx context.Context, cu *models.ComputeClusterUser) error { + if cu == nil || cu.ID == "" { + return fmt.Errorf("%w: compute cluster user id is required", ErrInvalidInput) + } + if cu.ComputeClusterID == "" { + return fmt.Errorf("%w: compute_cluster_id is required", ErrInvalidInput) + } + if cu.UserID == "" { + return fmt.Errorf("%w: user_id is required", ErrInvalidInput) + } + if cu.LocalUsername == "" { + return fmt.Errorf("%w: local_username is required", ErrInvalidInput) + } + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.clusterUsers.Update(ctx, tx, cu) + }); err != nil { + return fmt.Errorf("update compute cluster user: %w", err) + } + + s.eventBus.Publish(events.ComputeClusterUserUpdateEvent, cu) + return nil +} + +// DeleteComputeClusterUser removes a compute-cluster user mapping by ID. +func (s *Service) DeleteComputeClusterUser(ctx context.Context, id string) error { + if id == "" { + return fmt.Errorf("%w: compute cluster user id is required", ErrInvalidInput) + } + cu, err := s.clusterUsers.FindByID(ctx, id) + if err != nil { + return fmt.Errorf("lookup compute cluster user: %w", err) + } + if cu == nil { + return ErrNotFound + } + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.clusterUsers.Delete(ctx, tx, id) + }); err != nil { + return fmt.Errorf("delete compute cluster user: %w", err) + } + + s.eventBus.Publish(events.ComputeClusterUserDeleteEvent, cu) + return nil +} diff --git a/pkg/service/service.go b/pkg/service/service.go index 935630ac5..e187ff82f 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -40,6 +40,7 @@ type Service struct { users store.UserStore projs store.ProjectStore clusters store.ComputeClusterStore + clusterUsers store.ComputeClusterUserStore allocs store.ComputeAllocationStore resources store.ComputeAllocationResourceStore resourceMappings store.ComputeAllocationResourceMappingStore @@ -61,6 +62,7 @@ func New(database *sqlx.DB, eventBus *events.Bus) *Service { users: store.NewUserStore(database), projs: store.NewProjectStore(database), clusters: store.NewComputeClusterStore(database), + clusterUsers: store.NewComputeClusterUserStore(database), allocs: store.NewComputeAllocationStore(database), resources: store.NewComputeAllocationResourceStore(database), resourceMappings: store.NewComputeAllocationResourceMappingStore(database), @@ -83,6 +85,7 @@ func NewWithStores( users store.UserStore, projs store.ProjectStore, clusters store.ComputeClusterStore, + clusterUsers store.ComputeClusterUserStore, allocs store.ComputeAllocationStore, resources store.ComputeAllocationResourceStore, resourceMappings store.ComputeAllocationResourceMappingStore, @@ -100,6 +103,7 @@ func NewWithStores( users: users, projs: projs, clusters: clusters, + clusterUsers: clusterUsers, allocs: allocs, resources: resources, resourceMappings: resourceMappings,
