This is an automated email from the ASF dual-hosted git repository.

DImuthuUpe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git

commit 6b7e6daa6e18ca5da1233ae5bfc4af34f059eeb2
Author: DImuthuUpe <[email protected]>
AuthorDate: Mon May 18 23:47:59 2026 -0400

    Adding SLURM association when an allocation membership is created
---
 .../internal/subscribers/members.go                |  64 +++++++
 .../internal/subscribers/subscriber.go             |   1 +
 docs/API-Docs.md                                   | 195 ++++++++++++++++++++-
 .../000010_compute_cluster_users.down.sql          |  18 ++
 .../migrations/000010_compute_cluster_users.up.sql |  31 ++++
 internal/server/server.go                          |  80 +++++++++
 internal/store/compute_cluster_user_store.go       | 116 ++++++++++++
 internal/store/store.go                            |  19 ++
 pkg/events/compute_cluster_user_subscribe.go       |  64 +++++++
 pkg/events/types.go                                |   7 +
 pkg/models/allocation.go                           |   7 +
 pkg/service/compute_cluster_user.go                | 179 +++++++++++++++++++
 pkg/service/service.go                             |   4 +
 13 files changed, 784 insertions(+), 1 deletion(-)

diff --git 
a/connectors/SLURM/Association-Mapper/internal/subscribers/members.go 
b/connectors/SLURM/Association-Mapper/internal/subscribers/members.go
new file mode 100644
index 000000000..cd609d5af
--- /dev/null
+++ b/connectors/SLURM/Association-Mapper/internal/subscribers/members.go
@@ -0,0 +1,64 @@
+package subscribers
+
+import (
+       "log/slog"
+
+       "context"
+       client 
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/operations"
+       "github.com/apache/airavata-custos/pkg/models"
+       "time"
+)
+
+func (a *AssociationSubscriber) 
SubscribeToComputeAllocationMembershipCreation(membership 
models.ComputeAllocationMembership) {
+       slog.Info("Received compute allocation membership creation event", 
"membership", membership)
+
+       ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+       defer cancel()
+
+       allocation, err := a.coreService.GetComputeAllocation(ctx, 
membership.ComputeAllocationID)
+       if err != nil {
+               slog.Error("Failed to get compute allocation", "error", err)
+               return
+       }
+
+       cluster, err := a.coreService.GetComputeCluster(ctx, 
allocation.ComputeClusterID)
+       if err != nil {
+               slog.Error("Failed to get compute cluster", "error", err)
+               return
+       }
+
+       user, err := a.coreService.GetUser(ctx, membership.UserID)
+       if err != nil {
+               slog.Error("Failed to get user", "error", err)
+               return
+       }
+
+       csu, err := a.coreService.GetComputeClusterUserByPair(ctx, cluster.ID, 
user.ID) // TODO: use this to get the local username for the association 
instead of assuming it's the same as the Airavata Custos username
+       if err != nil {
+               slog.Error("Failed to get compute cluster user by pair", 
"error", err)
+               return
+       }
+
+       resources, err := a.coreService.ListResourcesForAllocation(ctx, 
allocation.ID) // TODO: use this to get the partition for the association 
instead of hardcoding it to "default"
+       if err != nil {
+               slog.Error("Failed to list resources for allocation", "error", 
err)
+               return
+       }
+
+       if len(resources) == 0 {
+               slog.Warn("No resources found for allocation, defaulting to 
partition 'default'", "allocation_id", allocation.ID)
+       }
+       association := client.Association{
+               Account:   allocation.Name,
+               Cluster:   cluster.Name,
+               User:      csu.LocalUsername,
+               Partition: resources[0].Name,
+       }
+
+       err = a.slurmClient.UpsertAssociation(association)
+       if err != nil {
+               slog.Error("Failed to upsert association", "error", err)
+       } else {
+               slog.Info("Successfully upserted association", "association", 
association)
+       }
+}
diff --git 
a/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go 
b/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go
index ce5161fd7..658dcc073 100644
--- a/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go
+++ b/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go
@@ -22,4 +22,5 @@ func (a *AssociationSubscriber) RegisterSubscribers() {
        
a.eventBus.SubscribeComputeAllocationCreated(a.SubscribeToComputeAllocationCreation)
        
a.eventBus.SubscribeComputeAllocationDeleted(a.SubscribeToComputeAllocationDeletion)
        
a.eventBus.SubscribeComputeAllocationUpdated(a.SubscribeToComputeAllocationUpdate)
+       
a.eventBus.SubscribeComputeAllocationMembershipCreated(a.SubscribeToComputeAllocationMembershipCreation)
 }
diff --git a/docs/API-Docs.md b/docs/API-Docs.md
index 6931d0304..81aa716db 100644
--- a/docs/API-Docs.md
+++ b/docs/API-Docs.md
@@ -330,6 +330,159 @@ Retrieve a single compute cluster by its ID.
 
 ---
 
+## Compute Cluster Users
+
+A compute-cluster user maps a Custos user to their local account
+(`local_username`) on a specific compute cluster. Each `(compute_cluster_id,
+user_id)` pair is unique. The mapping is removed automatically when either
+the referenced compute cluster or user is deleted.
+
+### `POST /compute-cluster-users`
+
+Create a new compute-cluster user mapping.
+
+**Required fields:** `compute_cluster_id`, `user_id`, `local_username`
+**Optional fields:** `id` (auto-generated if omitted)
+
+Both `compute_cluster_id` and `user_id` must reference existing records.
+
+**Request**
+
+```json
+{
+  "compute_cluster_id": "9b0a7f1c-2c5d-4e1b-9a0f-22e8a5c2dcb1",
+  "user_id":            "5e2c7b3a-1d8f-4d2c-bf09-83a7c4d6e210",
+  "local_username":     "jdoe"
+}
+```
+
+**Response 201**
+
+```json
+{
+  "id":                 "0d72d3b1-6f1a-4a92-9c4e-1d7a4b5f9c2d",
+  "compute_cluster_id": "9b0a7f1c-2c5d-4e1b-9a0f-22e8a5c2dcb1",
+  "user_id":            "5e2c7b3a-1d8f-4d2c-bf09-83a7c4d6e210",
+  "local_username":     "jdoe"
+}
+```
+
+**Errors**
+
+- `400` — `compute_cluster_id`, `user_id`, or `local_username` is missing,
+  or the referenced cluster/user does not exist.
+- `409` — this user is already mapped on the given compute cluster.
+
+---
+
+### `GET /compute-cluster-users/{id}`
+
+Retrieve a single compute-cluster user mapping by its ID.
+
+**Errors**
+
+- `404` — no mapping matches the supplied ID.
+
+---
+
+### `PUT /compute-cluster-users/{id}`
+
+Replace mutable fields of an existing compute-cluster user mapping. The
+path `{id}` overrides any `id` in the request body.
+
+**Required fields:** `compute_cluster_id`, `user_id`, `local_username`
+
+**Request**
+
+```json
+{
+  "compute_cluster_id": "9b0a7f1c-2c5d-4e1b-9a0f-22e8a5c2dcb1",
+  "user_id":            "5e2c7b3a-1d8f-4d2c-bf09-83a7c4d6e210",
+  "local_username":     "jane.doe"
+}
+```
+
+**Errors**
+
+- `400` — required fields missing.
+
+---
+
+### `DELETE /compute-cluster-users/{id}`
+
+Remove a compute-cluster user mapping.
+
+**Response 204** — empty body.
+
+**Errors**
+
+- `404` — no mapping matches the supplied ID.
+
+---
+
+### `GET /compute-clusters/{id}/users`
+
+List every user mapping for the given compute cluster, ordered by
+`local_username`.
+
+**Response 200**
+
+```json
+[
+  {
+    "id":                 "0d72d3b1-6f1a-4a92-9c4e-1d7a4b5f9c2d",
+    "compute_cluster_id": "9b0a7f1c-2c5d-4e1b-9a0f-22e8a5c2dcb1",
+    "user_id":            "5e2c7b3a-1d8f-4d2c-bf09-83a7c4d6e210",
+    "local_username":     "jdoe"
+  }
+]
+```
+
+---
+
+### `GET /compute-clusters/{id}/users/{userId}`
+
+Look up the single compute-cluster user mapping for the given
+`(compute_cluster_id, user_id)` pair.
+
+**Response 200**
+
+```json
+{
+  "id":                 "0d72d3b1-6f1a-4a92-9c4e-1d7a4b5f9c2d",
+  "compute_cluster_id": "9b0a7f1c-2c5d-4e1b-9a0f-22e8a5c2dcb1",
+  "user_id":            "5e2c7b3a-1d8f-4d2c-bf09-83a7c4d6e210",
+  "local_username":     "jdoe"
+}
+```
+
+**Errors**
+
+- `400` — `compute_cluster_id` or `user_id` is missing.
+- `404` — no mapping exists for the given pair.
+
+---
+
+### `GET /users/{id}/compute-cluster-users`
+
+List every cluster mapping held by the given Custos user, ordered by
+`compute_cluster_id`.
+
+**Response 200**
+
+```json
+[
+  {
+    "id":                 "0d72d3b1-6f1a-4a92-9c4e-1d7a4b5f9c2d",
+    "compute_cluster_id": "9b0a7f1c-2c5d-4e1b-9a0f-22e8a5c2dcb1",
+    "user_id":            "5e2c7b3a-1d8f-4d2c-bf09-83a7c4d6e210",
+    "local_username":     "jdoe"
+  }
+]
+```
+
+---
+
 ## Compute Allocations
 
 A compute allocation grants a project a budget of Service Units (SUs) on a
@@ -1076,7 +1229,7 @@ ALLOC_ID=$(curl -s -X POST $BASE/compute-allocations \
 
 RES_ID=$(curl -s -X POST $BASE/compute-allocation-resources \
   -H 'Content-Type: application/json' \
-  -d '{"name":"GPU B200","resource_type":"GPU","resource_amount":8}' | jq -r 
.id)
+  -d '{"name":"debug","resource_type":"CPU","resource_amount":24}' | jq -r .id)
 
 # Attach the resource to the allocation.
 curl -s -X POST $BASE/compute-allocations/$ALLOC_ID/resources \
@@ -1096,6 +1249,46 @@ curl -s -X POST $BASE/compute-allocation-resource-rates \
 # Look up the currently-effective rate.
 curl -s $BASE/compute-allocation-resources/$RES_ID/rates/effective | jq
 
+# Create a Custos user that will be granted access on the cluster.
+CLUSTER_USER_ACCT_ID=$(curl -s -X POST $BASE/users \
+  -H 'Content-Type: application/json' \
+  -d 
"{\"organization_id\":\"$ORG_ID\",\"first_name\":\"Dimuthu\",\"last_name\":\"Wannipurage\",\"email\":\"[email protected]\"}"
 \
+  | jq -r .id)
+
+# Map the Custos user to a local UNIX account on the cluster.
+CLUSTER_USER_ID=$(curl -s -X POST $BASE/compute-cluster-users \
+  -H 'Content-Type: application/json' \
+  -d 
"{\"compute_cluster_id\":\"$CLUSTER_ID\",\"user_id\":\"$CLUSTER_USER_ACCT_ID\",\"local_username\":\"dimuthu\"}"
 \
+  | jq -r .id)
+
+# List all user mappings on the cluster.
+curl -s $BASE/compute-clusters/$CLUSTER_ID/users | jq
+
+# Grant the user a sub-allocation (membership) on the compute allocation.
+MEMBERSHIP_ID=$(curl -s -X POST $BASE/compute-allocation-memberships \
+  -H 'Content-Type: application/json' \
+  -d "{
+        \"compute_allocation_id\":\"$ALLOC_ID\",
+        \"user_id\":\"$CLUSTER_USER_ACCT_ID\",
+        \"allocation_amount\":50000,
+        \"start_time\":\"2026-04-01T00:00:00Z\",
+        \"end_time\":\"2026-06-30T23:59:59Z\",
+        \"membership_status\":\"ACTIVE\"
+      }" | jq -r .id)
+
+# List every member of the allocation.
+curl -s $BASE/compute-allocations/$ALLOC_ID/memberships | jq
+
+# List every allocation this user is a member of.
+curl -s $BASE/users/$CLUSTER_USER_ACCT_ID/compute-allocation-memberships | jq
+
+# Bump the user's SU sub-allocation.
+curl -s -X PUT 
$BASE/compute-allocation-memberships/$MEMBERSHIP_ID/allocation-amount \
+  -H 'Content-Type: application/json' \
+  -d '{"allocation_amount":75000}' | jq
+
+
+
 # Record a usage diff against the allocation.
 curl -s -X POST $BASE/compute-allocation-diffs \
   -H 'Content-Type: application/json' \
diff --git a/internal/db/migrations/000010_compute_cluster_users.down.sql 
b/internal/db/migrations/000010_compute_cluster_users.down.sql
new file mode 100644
index 000000000..c207cd7c1
--- /dev/null
+++ b/internal/db/migrations/000010_compute_cluster_users.down.sql
@@ -0,0 +1,18 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+DROP TABLE IF EXISTS compute_cluster_users;
diff --git a/internal/db/migrations/000010_compute_cluster_users.up.sql 
b/internal/db/migrations/000010_compute_cluster_users.up.sql
new file mode 100644
index 000000000..284536208
--- /dev/null
+++ b/internal/db/migrations/000010_compute_cluster_users.up.sql
@@ -0,0 +1,31 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+CREATE TABLE IF NOT EXISTS compute_cluster_users
+(
+    id                 VARCHAR(255) NOT NULL,
+    compute_cluster_id VARCHAR(255) NOT NULL,
+    user_id            VARCHAR(255) NOT NULL,
+    local_username     VARCHAR(255) NOT NULL,
+    created_at         TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+    updated_at         TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON 
UPDATE CURRENT_TIMESTAMP(6),
+    PRIMARY KEY (id),
+    UNIQUE KEY uq_compute_cluster_users_pair (compute_cluster_id, user_id),
+    KEY idx_compute_cluster_users_user (user_id),
+    CONSTRAINT fk_compute_cluster_users_cluster FOREIGN KEY 
(compute_cluster_id) REFERENCES compute_clusters (id) ON DELETE CASCADE,
+    CONSTRAINT fk_compute_cluster_users_user FOREIGN KEY (user_id) REFERENCES 
users (id) ON DELETE CASCADE
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
diff --git a/internal/server/server.go b/internal/server/server.go
index fb98b7320..4b3e20fd2 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -64,6 +64,14 @@ func (s *Server) routes() {
        s.mux.HandleFunc("GET /compute-clusters", s.listComputeClusters)
        s.mux.HandleFunc("GET /compute-clusters/{id}", s.getComputeCluster)
 
+       s.mux.HandleFunc("POST /compute-cluster-users", 
s.createComputeClusterUser)
+       s.mux.HandleFunc("GET /compute-cluster-users/{id}", 
s.getComputeClusterUser)
+       s.mux.HandleFunc("PUT /compute-cluster-users/{id}", 
s.updateComputeClusterUser)
+       s.mux.HandleFunc("DELETE /compute-cluster-users/{id}", 
s.deleteComputeClusterUser)
+       s.mux.HandleFunc("GET /compute-clusters/{id}/users", 
s.listComputeClusterUsersByCluster)
+       s.mux.HandleFunc("GET /compute-clusters/{id}/users/{userId}", 
s.getComputeClusterUserByPair)
+       s.mux.HandleFunc("GET /users/{id}/compute-cluster-users", 
s.listComputeClusterUsersByUser)
+
        s.mux.HandleFunc("POST /compute-allocations", s.createComputeAllocation)
        s.mux.HandleFunc("GET /compute-allocations/{id}", 
s.getComputeAllocation)
 
@@ -223,6 +231,78 @@ func (s *Server) listComputeClusters(w 
http.ResponseWriter, r *http.Request) {
        writeJSON(w, http.StatusOK, clusters)
 }
 
+func (s *Server) createComputeClusterUser(w http.ResponseWriter, r 
*http.Request) {
+       var cu models.ComputeClusterUser
+       if err := decodeJSON(r, &cu); err != nil {
+               writeError(w, http.StatusBadRequest, err)
+               return
+       }
+       created, err := s.svc.CreateComputeClusterUser(r.Context(), &cu)
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusCreated, created)
+}
+
+func (s *Server) getComputeClusterUser(w http.ResponseWriter, r *http.Request) 
{
+       cu, err := s.svc.GetComputeClusterUser(r.Context(), r.PathValue("id"))
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, cu)
+}
+
+func (s *Server) updateComputeClusterUser(w http.ResponseWriter, r 
*http.Request) {
+       var cu models.ComputeClusterUser
+       if err := decodeJSON(r, &cu); err != nil {
+               writeError(w, http.StatusBadRequest, err)
+               return
+       }
+       cu.ID = r.PathValue("id")
+       if err := s.svc.UpdateComputeClusterUser(r.Context(), &cu); err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, &cu)
+}
+
+func (s *Server) deleteComputeClusterUser(w http.ResponseWriter, r 
*http.Request) {
+       if err := s.svc.DeleteComputeClusterUser(r.Context(), 
r.PathValue("id")); err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       w.WriteHeader(http.StatusNoContent)
+}
+
+func (s *Server) listComputeClusterUsersByCluster(w http.ResponseWriter, r 
*http.Request) {
+       users, err := s.svc.ListComputeClusterUsersByCluster(r.Context(), 
r.PathValue("id"))
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, users)
+}
+
+func (s *Server) getComputeClusterUserByPair(w http.ResponseWriter, r 
*http.Request) {
+       cu, err := s.svc.GetComputeClusterUserByPair(r.Context(), 
r.PathValue("id"), r.PathValue("userId"))
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, cu)
+}
+
+func (s *Server) listComputeClusterUsersByUser(w http.ResponseWriter, r 
*http.Request) {
+       users, err := s.svc.ListComputeClusterUsersByUser(r.Context(), 
r.PathValue("id"))
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, users)
+}
+
 func (s *Server) createComputeAllocation(w http.ResponseWriter, r 
*http.Request) {
        var a models.ComputeAllocation
        if err := decodeJSON(r, &a); err != nil {
diff --git a/internal/store/compute_cluster_user_store.go 
b/internal/store/compute_cluster_user_store.go
new file mode 100644
index 000000000..f5926ad1e
--- /dev/null
+++ b/internal/store/compute_cluster_user_store.go
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+
+       "github.com/jmoiron/sqlx"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+type mysqlComputeClusterUserStore struct {
+       db *sqlx.DB
+}
+
+// NewComputeClusterUserStore returns a MySQL-backed ComputeClusterUserStore.
+func NewComputeClusterUserStore(db *sqlx.DB) ComputeClusterUserStore {
+       return &mysqlComputeClusterUserStore{db: db}
+}
+
+func (s *mysqlComputeClusterUserStore) FindByID(ctx context.Context, id 
string) (*models.ComputeClusterUser, error) {
+       var c models.ComputeClusterUser
+       err := s.db.GetContext(ctx, &c,
+               `SELECT id, compute_cluster_id, user_id, local_username
+           FROM compute_cluster_users WHERE id = ?`, id)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &c, nil
+}
+
+func (s *mysqlComputeClusterUserStore) FindByPair(ctx context.Context, 
clusterID, userID string) (*models.ComputeClusterUser, error) {
+       var c models.ComputeClusterUser
+       err := s.db.GetContext(ctx, &c,
+               `SELECT id, compute_cluster_id, user_id, local_username
+           FROM compute_cluster_users
+          WHERE compute_cluster_id = ? AND user_id = ?`, clusterID, userID)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &c, nil
+}
+
+func (s *mysqlComputeClusterUserStore) FindByCluster(ctx context.Context, 
clusterID string) ([]models.ComputeClusterUser, error) {
+       var users []models.ComputeClusterUser
+       err := s.db.SelectContext(ctx, &users,
+               `SELECT id, compute_cluster_id, user_id, local_username
+           FROM compute_cluster_users
+          WHERE compute_cluster_id = ?
+          ORDER BY local_username`, clusterID)
+       if err != nil {
+               return nil, err
+       }
+       return users, nil
+}
+
+func (s *mysqlComputeClusterUserStore) FindByUser(ctx context.Context, userID 
string) ([]models.ComputeClusterUser, error) {
+       var users []models.ComputeClusterUser
+       err := s.db.SelectContext(ctx, &users,
+               `SELECT id, compute_cluster_id, user_id, local_username
+           FROM compute_cluster_users
+          WHERE user_id = ?
+          ORDER BY compute_cluster_id`, userID)
+       if err != nil {
+               return nil, err
+       }
+       return users, nil
+}
+
+func (s *mysqlComputeClusterUserStore) Create(ctx context.Context, tx *sql.Tx, 
c *models.ComputeClusterUser) error {
+       _, err := tx.ExecContext(ctx,
+               `INSERT INTO compute_cluster_users (id, compute_cluster_id, 
user_id, local_username)
+         VALUES (?, ?, ?, ?)`,
+               c.ID, c.ComputeClusterID, c.UserID, c.LocalUsername)
+       return err
+}
+
+func (s *mysqlComputeClusterUserStore) Update(ctx context.Context, tx *sql.Tx, 
c *models.ComputeClusterUser) error {
+       _, err := tx.ExecContext(ctx,
+               `UPDATE compute_cluster_users
+            SET compute_cluster_id = ?,
+                user_id            = ?,
+                local_username     = ?
+          WHERE id = ?`,
+               c.ComputeClusterID, c.UserID, c.LocalUsername, c.ID)
+       return err
+}
+
+func (s *mysqlComputeClusterUserStore) Delete(ctx context.Context, tx *sql.Tx, 
id string) error {
+       _, err := tx.ExecContext(ctx, `DELETE FROM compute_cluster_users WHERE 
id = ?`, id)
+       return err
+}
diff --git a/internal/store/store.go b/internal/store/store.go
index ba6faa9f0..4dddec180 100644
--- a/internal/store/store.go
+++ b/internal/store/store.go
@@ -71,6 +71,25 @@ type ComputeClusterStore interface {
        Delete(ctx context.Context, tx *sql.Tx, id string) error
 }
 
+// ComputeClusterUserStore defines persistence operations for the mapping of
+// a Custos user to their local account on a compute cluster.
+type ComputeClusterUserStore interface {
+       // FindByID returns the mapping with the given ID, or nil if not found.
+       FindByID(ctx context.Context, id string) (*models.ComputeClusterUser, 
error)
+       // FindByPair returns the mapping for a (compute_cluster_id, user_id) 
pair, or nil if absent.
+       FindByPair(ctx context.Context, clusterID, userID string) 
(*models.ComputeClusterUser, error)
+       // FindByCluster returns every user mapping for the given compute 
cluster.
+       FindByCluster(ctx context.Context, clusterID string) 
([]models.ComputeClusterUser, error)
+       // FindByUser returns every cluster mapping held by the given Custos 
user.
+       FindByUser(ctx context.Context, userID string) 
([]models.ComputeClusterUser, error)
+       // Create inserts a new mapping within the provided transaction.
+       Create(ctx context.Context, tx *sql.Tx, c *models.ComputeClusterUser) 
error
+       // Update replaces mutable fields of an existing mapping within the 
provided transaction.
+       Update(ctx context.Context, tx *sql.Tx, c *models.ComputeClusterUser) 
error
+       // Delete removes a mapping by ID within the provided transaction.
+       Delete(ctx context.Context, tx *sql.Tx, id string) error
+}
+
 // ProjectStore defines persistence operations for projects.
 type ProjectStore interface {
        // FindByID returns the project with the given ID, or nil if not found.
diff --git a/pkg/events/compute_cluster_user_subscribe.go 
b/pkg/events/compute_cluster_user_subscribe.go
new file mode 100644
index 000000000..5ba7a11a7
--- /dev/null
+++ b/pkg/events/compute_cluster_user_subscribe.go
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package events
+
+import (
+       "log/slog"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// ComputeClusterUserHandler handles compute-cluster user lifecycle events
+// with a typed payload.
+type ComputeClusterUserHandler func(user models.ComputeClusterUser)
+
+// SubscribeComputeClusterUserCreated registers a typed handler invoked
+// whenever a compute_cluster_user::create event is published.
+func (b *Bus) SubscribeComputeClusterUserCreated(handler 
ComputeClusterUserHandler) {
+       b.subscribeComputeClusterUser(ComputeClusterUserCreateEvent, handler)
+}
+
+// SubscribeComputeClusterUserUpdated registers a typed handler invoked
+// whenever a compute_cluster_user::update event is published.
+func (b *Bus) SubscribeComputeClusterUserUpdated(handler 
ComputeClusterUserHandler) {
+       b.subscribeComputeClusterUser(ComputeClusterUserUpdateEvent, handler)
+}
+
+// SubscribeComputeClusterUserDeleted registers a typed handler invoked
+// whenever a compute_cluster_user::delete event is published.
+func (b *Bus) SubscribeComputeClusterUserDeleted(handler 
ComputeClusterUserHandler) {
+       b.subscribeComputeClusterUser(ComputeClusterUserDeleteEvent, handler)
+}
+
+func (b *Bus) subscribeComputeClusterUser(topic EventType, handler 
ComputeClusterUserHandler) {
+       b.Subscribe(topic, func(event Event, value interface{}) {
+               switch u := value.(type) {
+               case models.ComputeClusterUser:
+                       handler(u)
+               case *models.ComputeClusterUser:
+                       if u != nil {
+                               handler(*u)
+                       }
+               default:
+                       slog.Warn("compute cluster user event payload has 
unexpected type",
+                               "type", event.Type,
+                               "got", value,
+                       )
+               }
+       })
+}
diff --git a/pkg/events/types.go b/pkg/events/types.go
index 11c900e94..44758f0e4 100644
--- a/pkg/events/types.go
+++ b/pkg/events/types.go
@@ -54,6 +54,13 @@ const (
        ComputeClusterDeleteEvent EventType = "compute_cluster::delete"
 )
 
+// ComputeClusterUser lifecycle message types.
+const (
+       ComputeClusterUserCreateEvent EventType = "compute_cluster_user::create"
+       ComputeClusterUserUpdateEvent EventType = "compute_cluster_user::update"
+       ComputeClusterUserDeleteEvent EventType = "compute_cluster_user::delete"
+)
+
 // ClusterAccount lifecycle message types.
 const (
        ClusterAccountCreateEvent EventType = "cluster_account::create"
diff --git a/pkg/models/allocation.go b/pkg/models/allocation.go
index 1037b1d7f..7813dcc2a 100644
--- a/pkg/models/allocation.go
+++ b/pkg/models/allocation.go
@@ -15,6 +15,13 @@ type ComputeCluster struct {
        Name string `json:"name" db:"name"` // A human-readable name for the 
compute cluster, e.g., "Cluster A", "Cluster B", etc.
 }
 
+type ComputeClusterUser struct {
+       ID               string `json:"id"                db:"id"`
+       ComputeClusterID string `json:"compute_cluster_id" 
db:"compute_cluster_id"`
+       UserID           string `json:"user_id"            db:"user_id"`
+       LocalUsername    string `json:"local_username"     db:"local_username"` 
// The username of the user on the compute cluster, which may be different from 
their Airavata Custos username.
+}
+
 type ComputeAllocation struct {
        ID               string           `json:"id"                 db:"id"`
        ProjectID        string           `json:"project_id"         
db:"project_id"`
diff --git a/pkg/service/compute_cluster_user.go 
b/pkg/service/compute_cluster_user.go
new file mode 100644
index 000000000..3e8ed9868
--- /dev/null
+++ b/pkg/service/compute_cluster_user.go
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package service
+
+import (
+       "context"
+       "database/sql"
+       "fmt"
+
+       "github.com/apache/airavata-custos/pkg/events"
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// CreateComputeClusterUser persists a new compute-cluster user mapping. If
+// the ID is empty, a UUID is generated. The (possibly populated) record is
+// returned.
+func (s *Service) CreateComputeClusterUser(ctx context.Context, cu 
*models.ComputeClusterUser) (*models.ComputeClusterUser, error) {
+       if cu == nil {
+               return nil, fmt.Errorf("%w: compute cluster user is nil", 
ErrInvalidInput)
+       }
+       if cu.ComputeClusterID == "" {
+               return nil, fmt.Errorf("%w: compute_cluster_id is required", 
ErrInvalidInput)
+       }
+       if cu.UserID == "" {
+               return nil, fmt.Errorf("%w: user_id is required", 
ErrInvalidInput)
+       }
+       if cu.LocalUsername == "" {
+               return nil, fmt.Errorf("%w: local_username is required", 
ErrInvalidInput)
+       }
+       if cu.ID == "" {
+               cu.ID = newID()
+       }
+
+       if cluster, err := s.clusters.FindByID(ctx, cu.ComputeClusterID); err 
!= nil {
+               return nil, fmt.Errorf("lookup compute cluster: %w", err)
+       } else if cluster == nil {
+               return nil, fmt.Errorf("%w: compute cluster %q not found", 
ErrInvalidInput, cu.ComputeClusterID)
+       }
+       if user, err := s.users.FindByID(ctx, cu.UserID); err != nil {
+               return nil, fmt.Errorf("lookup user: %w", err)
+       } else if user == nil {
+               return nil, fmt.Errorf("%w: user %q not found", 
ErrInvalidInput, cu.UserID)
+       }
+
+       if existing, err := s.clusterUsers.FindByPair(ctx, cu.ComputeClusterID, 
cu.UserID); err != nil {
+               return nil, fmt.Errorf("lookup compute cluster user by pair: 
%w", err)
+       } else if existing != nil {
+               return nil, fmt.Errorf("%w: user %q is already mapped on 
cluster %q",
+                       ErrAlreadyExists, cu.UserID, cu.ComputeClusterID)
+       }
+
+       if err := s.inTx(ctx, func(tx *sql.Tx) error {
+               return s.clusterUsers.Create(ctx, tx, cu)
+       }); err != nil {
+               return nil, fmt.Errorf("create compute cluster user: %w", err)
+       }
+
+       s.eventBus.Publish(events.ComputeClusterUserCreateEvent, cu)
+       return cu, nil
+}
+
+// GetComputeClusterUser retrieves a compute-cluster user by its ID.
+func (s *Service) GetComputeClusterUser(ctx context.Context, id string) 
(*models.ComputeClusterUser, error) {
+       c, err := s.clusterUsers.FindByID(ctx, id)
+       if err != nil {
+               return nil, fmt.Errorf("get compute cluster user: %w", err)
+       }
+       if c == nil {
+               return nil, ErrNotFound
+       }
+       return c, nil
+}
+
+// GetComputeClusterUserByPair retrieves the compute-cluster user mapping for
+// the given (compute_cluster_id, user_id) pair.
+func (s *Service) GetComputeClusterUserByPair(ctx context.Context, clusterID, 
userID string) (*models.ComputeClusterUser, error) {
+       if clusterID == "" {
+               return nil, fmt.Errorf("%w: compute_cluster_id is required", 
ErrInvalidInput)
+       }
+       if userID == "" {
+               return nil, fmt.Errorf("%w: user_id is required", 
ErrInvalidInput)
+       }
+       c, err := s.clusterUsers.FindByPair(ctx, clusterID, userID)
+       if err != nil {
+               return nil, fmt.Errorf("get compute cluster user by pair: %w", 
err)
+       }
+       if c == nil {
+               return nil, ErrNotFound
+       }
+       return c, nil
+}
+
+// ListComputeClusterUsersByCluster returns every user mapping for the given
+// compute cluster, ordered by local username.
+func (s *Service) ListComputeClusterUsersByCluster(ctx context.Context, 
clusterID string) ([]models.ComputeClusterUser, error) {
+       if clusterID == "" {
+               return nil, fmt.Errorf("%w: compute_cluster_id is required", 
ErrInvalidInput)
+       }
+       users, err := s.clusterUsers.FindByCluster(ctx, clusterID)
+       if err != nil {
+               return nil, fmt.Errorf("list compute cluster users by cluster: 
%w", err)
+       }
+       return users, nil
+}
+
+// ListComputeClusterUsersByUser returns every cluster mapping held by the
+// given Custos user.
+func (s *Service) ListComputeClusterUsersByUser(ctx context.Context, userID 
string) ([]models.ComputeClusterUser, error) {
+       if userID == "" {
+               return nil, fmt.Errorf("%w: user_id is required", 
ErrInvalidInput)
+       }
+       users, err := s.clusterUsers.FindByUser(ctx, userID)
+       if err != nil {
+               return nil, fmt.Errorf("list compute cluster users by user: 
%w", err)
+       }
+       return users, nil
+}
+
+// UpdateComputeClusterUser persists changes to an existing compute-cluster
+// user mapping.
+func (s *Service) UpdateComputeClusterUser(ctx context.Context, cu 
*models.ComputeClusterUser) error {
+       if cu == nil || cu.ID == "" {
+               return fmt.Errorf("%w: compute cluster user id is required", 
ErrInvalidInput)
+       }
+       if cu.ComputeClusterID == "" {
+               return fmt.Errorf("%w: compute_cluster_id is required", 
ErrInvalidInput)
+       }
+       if cu.UserID == "" {
+               return fmt.Errorf("%w: user_id is required", ErrInvalidInput)
+       }
+       if cu.LocalUsername == "" {
+               return fmt.Errorf("%w: local_username is required", 
ErrInvalidInput)
+       }
+       if err := s.inTx(ctx, func(tx *sql.Tx) error {
+               return s.clusterUsers.Update(ctx, tx, cu)
+       }); err != nil {
+               return fmt.Errorf("update compute cluster user: %w", err)
+       }
+
+       s.eventBus.Publish(events.ComputeClusterUserUpdateEvent, cu)
+       return nil
+}
+
+// DeleteComputeClusterUser removes a compute-cluster user mapping by ID.
+func (s *Service) DeleteComputeClusterUser(ctx context.Context, id string) 
error {
+       if id == "" {
+               return fmt.Errorf("%w: compute cluster user id is required", 
ErrInvalidInput)
+       }
+       cu, err := s.clusterUsers.FindByID(ctx, id)
+       if err != nil {
+               return fmt.Errorf("lookup compute cluster user: %w", err)
+       }
+       if cu == nil {
+               return ErrNotFound
+       }
+       if err := s.inTx(ctx, func(tx *sql.Tx) error {
+               return s.clusterUsers.Delete(ctx, tx, id)
+       }); err != nil {
+               return fmt.Errorf("delete compute cluster user: %w", err)
+       }
+
+       s.eventBus.Publish(events.ComputeClusterUserDeleteEvent, cu)
+       return nil
+}
diff --git a/pkg/service/service.go b/pkg/service/service.go
index 935630ac5..e187ff82f 100644
--- a/pkg/service/service.go
+++ b/pkg/service/service.go
@@ -40,6 +40,7 @@ type Service struct {
        users            store.UserStore
        projs            store.ProjectStore
        clusters         store.ComputeClusterStore
+       clusterUsers     store.ComputeClusterUserStore
        allocs           store.ComputeAllocationStore
        resources        store.ComputeAllocationResourceStore
        resourceMappings store.ComputeAllocationResourceMappingStore
@@ -61,6 +62,7 @@ func New(database *sqlx.DB, eventBus *events.Bus) *Service {
                users:            store.NewUserStore(database),
                projs:            store.NewProjectStore(database),
                clusters:         store.NewComputeClusterStore(database),
+               clusterUsers:     store.NewComputeClusterUserStore(database),
                allocs:           store.NewComputeAllocationStore(database),
                resources:        
store.NewComputeAllocationResourceStore(database),
                resourceMappings: 
store.NewComputeAllocationResourceMappingStore(database),
@@ -83,6 +85,7 @@ func NewWithStores(
        users store.UserStore,
        projs store.ProjectStore,
        clusters store.ComputeClusterStore,
+       clusterUsers store.ComputeClusterUserStore,
        allocs store.ComputeAllocationStore,
        resources store.ComputeAllocationResourceStore,
        resourceMappings store.ComputeAllocationResourceMappingStore,
@@ -100,6 +103,7 @@ func NewWithStores(
                users:            users,
                projs:            projs,
                clusters:         clusters,
+               clusterUsers:     clusterUsers,
                allocs:           allocs,
                resources:        resources,
                resourceMappings: resourceMappings,


Reply via email to