This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 50f03d8eeae366e018308576415c661dbc0e5dd8 Author: lahiruj <[email protected]> AuthorDate: Tue May 19 02:11:37 2026 -0400 Add Project.Status, ClusterAccount, UserMerge, and identity helpers to core --- .../db/migrations/000011_projects_status.down.sql | 20 +++ .../db/migrations/000011_projects_status.up.sql | 20 +++ .../db/migrations/000012_cluster_accounts.down.sql | 18 +++ .../db/migrations/000012_cluster_accounts.up.sql | 34 ++++ .../db/migrations/000013_users_status.down.sql | 20 +++ internal/db/migrations/000013_users_status.up.sql | 20 +++ internal/db/migrations/000014_user_merges.down.sql | 18 +++ internal/db/migrations/000014_user_merges.up.sql | 30 ++++ internal/server/server.go | 138 +++++++++++++++- internal/store/cluster_account_store.go | 128 +++++++++++++++ .../store/compute_allocation_membership_store.go | 20 +++ internal/store/external_identity_store.go | 7 + internal/store/project_store.go | 30 +++- internal/store/store.go | 59 ++++++- internal/store/user_dn_store.go | 15 ++ internal/store/user_merge_store.go | 77 +++++++++ internal/store/user_store.go | 24 +-- pkg/events/cluster_account_subscribe.go | 63 ++++++++ pkg/events/external_identity_subscribe.go | 17 ++ pkg/events/user_dn_subscribe.go | 17 ++ pkg/models/allocation.go | 11 ++ pkg/models/project.go | 48 ++++-- pkg/service/cluster_account.go | 176 +++++++++++++++++++++ pkg/service/project.go | 36 +++++ pkg/service/service.go | 8 + pkg/service/user.go | 24 +++ pkg/service/user_merge.go | 100 ++++++++++++ 27 files changed, 1142 insertions(+), 36 deletions(-) diff --git a/internal/db/migrations/000011_projects_status.down.sql b/internal/db/migrations/000011_projects_status.down.sql new file mode 100644 index 000000000..136999f6f --- /dev/null +++ b/internal/db/migrations/000011_projects_status.down.sql @@ -0,0 +1,20 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +ALTER TABLE projects + DROP KEY idx_projects_status, + DROP COLUMN status; diff --git a/internal/db/migrations/000011_projects_status.up.sql b/internal/db/migrations/000011_projects_status.up.sql new file mode 100644 index 000000000..1de30ed61 --- /dev/null +++ b/internal/db/migrations/000011_projects_status.up.sql @@ -0,0 +1,20 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +ALTER TABLE projects + ADD COLUMN status VARCHAR(32) NOT NULL DEFAULT 'ACTIVE' AFTER project_pi_id, + ADD KEY idx_projects_status (status); diff --git a/internal/db/migrations/000012_cluster_accounts.down.sql b/internal/db/migrations/000012_cluster_accounts.down.sql new file mode 100644 index 000000000..f3ca1be95 --- /dev/null +++ b/internal/db/migrations/000012_cluster_accounts.down.sql @@ -0,0 +1,18 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +DROP TABLE IF EXISTS cluster_accounts; diff --git a/internal/db/migrations/000012_cluster_accounts.up.sql b/internal/db/migrations/000012_cluster_accounts.up.sql new file mode 100644 index 000000000..3eab32279 --- /dev/null +++ b/internal/db/migrations/000012_cluster_accounts.up.sql @@ -0,0 +1,34 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +CREATE TABLE IF NOT EXISTS cluster_accounts +( + id VARCHAR(255) NOT NULL, + user_id VARCHAR(255) NOT NULL, + compute_cluster_id VARCHAR(255) NOT NULL, + username VARCHAR(255) NOT NULL, + status VARCHAR(32) NOT NULL DEFAULT 'ACTIVE', + created_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + updated_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6), + PRIMARY KEY (id), + UNIQUE KEY uq_cluster_accounts_cluster_username (compute_cluster_id, username), + KEY idx_cluster_accounts_user (user_id), + KEY idx_cluster_accounts_cluster (compute_cluster_id), + KEY idx_cluster_accounts_status (status), + CONSTRAINT fk_cluster_accounts_user FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE, + CONSTRAINT fk_cluster_accounts_cluster FOREIGN KEY (compute_cluster_id) REFERENCES compute_clusters (id) ON DELETE CASCADE +) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci; diff --git a/internal/db/migrations/000013_users_status.down.sql b/internal/db/migrations/000013_users_status.down.sql new file mode 100644 index 000000000..8c5d8445d --- /dev/null +++ b/internal/db/migrations/000013_users_status.down.sql @@ -0,0 +1,20 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +ALTER TABLE users + DROP KEY idx_users_status, + DROP COLUMN status; diff --git a/internal/db/migrations/000013_users_status.up.sql b/internal/db/migrations/000013_users_status.up.sql new file mode 100644 index 000000000..5b88416a6 --- /dev/null +++ b/internal/db/migrations/000013_users_status.up.sql @@ -0,0 +1,20 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +ALTER TABLE users + ADD COLUMN status VARCHAR(32) NOT NULL DEFAULT 'ACTIVE' AFTER email, + ADD KEY idx_users_status (status); diff --git a/internal/db/migrations/000014_user_merges.down.sql b/internal/db/migrations/000014_user_merges.down.sql new file mode 100644 index 000000000..bc65c15a1 --- /dev/null +++ b/internal/db/migrations/000014_user_merges.down.sql @@ -0,0 +1,18 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +DROP TABLE IF EXISTS user_merges; diff --git a/internal/db/migrations/000014_user_merges.up.sql b/internal/db/migrations/000014_user_merges.up.sql new file mode 100644 index 000000000..7aeeb8844 --- /dev/null +++ b/internal/db/migrations/000014_user_merges.up.sql @@ -0,0 +1,30 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +CREATE TABLE IF NOT EXISTS user_merges +( + id BIGINT NOT NULL AUTO_INCREMENT, + retiring_user_id VARCHAR(255) NOT NULL, + surviving_user_id VARCHAR(255) NOT NULL, + reason TEXT NULL, + merged_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + PRIMARY KEY (id), + UNIQUE KEY uq_user_merges_retiring (retiring_user_id), + KEY idx_user_merges_surviving (surviving_user_id), + CONSTRAINT fk_user_merges_retiring FOREIGN KEY (retiring_user_id) REFERENCES users (id) ON DELETE RESTRICT, + CONSTRAINT fk_user_merges_surviving FOREIGN KEY (surviving_user_id) REFERENCES users (id) ON DELETE RESTRICT +) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci; diff --git a/internal/server/server.go b/internal/server/server.go index 56b6a6bd7..09f33d22b 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -56,9 +56,12 @@ func (s *Server) routes() { s.mux.HandleFunc("POST /users", s.createUser) s.mux.HandleFunc("GET /users/{id}", s.getUser) + s.mux.HandleFunc("GET /users/external-identity/{source}/{externalId}", s.getUserByExternalIdentity) + s.mux.HandleFunc("POST /users/merge", s.mergeUsers) s.mux.HandleFunc("POST /projects", s.createProject) s.mux.HandleFunc("GET /projects/{id}", s.getProject) + s.mux.HandleFunc("PUT /projects/{id}/status", s.updateProjectStatus) s.mux.HandleFunc("POST /compute-clusters", s.createComputeCluster) s.mux.HandleFunc("GET /compute-clusters", s.listComputeClusters) @@ -117,19 +120,27 @@ func (s *Server) routes() { s.mux.HandleFunc("GET /compute-allocations/{id}/users/{userId}/usages/total", s.getTotalSUUsageForUserInAllocation) s.mux.HandleFunc("GET /users/{id}/compute-allocation-usages", s.listUsagesByUser) + s.mux.HandleFunc("POST /cluster-accounts", s.createClusterAccount) + s.mux.HandleFunc("GET /cluster-accounts/{id}", s.getClusterAccount) + s.mux.HandleFunc("PUT /cluster-accounts/{id}/status", s.updateClusterAccountStatus) + s.mux.HandleFunc("DELETE /cluster-accounts/{id}", s.deleteClusterAccount) + s.mux.HandleFunc("GET /compute-clusters/{id}/accounts", s.listClusterAccountsForCluster) + s.mux.HandleFunc("GET /compute-clusters/{id}/accounts/{username}", s.getClusterAccountByClusterAndUsername) + s.mux.HandleFunc("GET /users/{id}/cluster-accounts", s.listClusterAccountsForUser) + s.mux.HandleFunc("POST /external-identities", s.createExternalIdentity) s.mux.HandleFunc("GET /external-identities/{id}", s.getExternalIdentity) s.mux.HandleFunc("PUT /external-identities/{id}", s.updateExternalIdentity) s.mux.HandleFunc("DELETE /external-identities/{id}", s.deleteExternalIdentity) - s.mux.HandleFunc("GET /external-identities/by-source/{source}/{externalId}", s.getExternalIdentityBySource) - s.mux.HandleFunc("GET /external-identities/by-oidc-sub/{sub}", s.getExternalIdentityByOIDCSub) + s.mux.HandleFunc("GET /external-identities/source/{source}/{externalId}", s.getExternalIdentityBySource) + s.mux.HandleFunc("GET /external-identities/oidc-sub/{sub}", s.getExternalIdentityByOIDCSub) s.mux.HandleFunc("GET /users/{id}/external-identities", s.listExternalIdentitiesForUser) s.mux.HandleFunc("POST /users/{id}/dns", s.addUserDN) s.mux.HandleFunc("GET /user-dns/{id}", s.getUserDN) s.mux.HandleFunc("DELETE /user-dns/{id}", s.removeUserDN) s.mux.HandleFunc("GET /users/{id}/dns", s.listUserDNs) - s.mux.HandleFunc("GET /user-dns/by-dn", s.getUserDNByDN) + s.mux.HandleFunc("GET /user-dns", s.getUserDNByDN) } func (s *Server) healthz(w http.ResponseWriter, _ *http.Request) { @@ -182,6 +193,33 @@ func (s *Server) getUser(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, u) } +func (s *Server) getUserByExternalIdentity(w http.ResponseWriter, r *http.Request) { + u, err := s.svc.GetUserByExternalIdentity(r.Context(), r.PathValue("source"), r.PathValue("externalId")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, u) +} + +func (s *Server) mergeUsers(w http.ResponseWriter, r *http.Request) { + var body struct { + SurvivingUserID string `json:"surviving_user_id"` + RetiringUserID string `json:"retiring_user_id"` + Reason string `json:"reason,omitempty"` + } + if err := decodeJSON(r, &body); err != nil { + writeError(w, http.StatusBadRequest, err) + return + } + survivor, err := s.svc.MergeUsers(r.Context(), body.SurvivingUserID, body.RetiringUserID, body.Reason) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, survivor) +} + func (s *Server) createProject(w http.ResponseWriter, r *http.Request) { var p models.Project if err := decodeJSON(r, &p); err != nil { @@ -720,6 +758,96 @@ func (s *Server) getTotalSUUsageForUserInAllocation(w http.ResponseWriter, r *ht }) } +func (s *Server) updateProjectStatus(w http.ResponseWriter, r *http.Request) { + var body struct { + Status models.AllocationStatus `json:"status"` + } + if err := decodeJSON(r, &body); err != nil { + writeError(w, http.StatusBadRequest, err) + return + } + updated, err := s.svc.UpdateProjectStatus(r.Context(), r.PathValue("id"), body.Status) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, updated) +} + +func (s *Server) createClusterAccount(w http.ResponseWriter, r *http.Request) { + var a models.ClusterAccount + if err := decodeJSON(r, &a); err != nil { + writeError(w, http.StatusBadRequest, err) + return + } + created, err := s.svc.CreateClusterAccount(r.Context(), &a) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusCreated, created) +} + +func (s *Server) getClusterAccount(w http.ResponseWriter, r *http.Request) { + a, err := s.svc.GetClusterAccount(r.Context(), r.PathValue("id")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, a) +} + +func (s *Server) updateClusterAccountStatus(w http.ResponseWriter, r *http.Request) { + var body struct { + Status models.AllocationStatus `json:"status"` + } + if err := decodeJSON(r, &body); err != nil { + writeError(w, http.StatusBadRequest, err) + return + } + updated, err := s.svc.UpdateClusterAccountStatus(r.Context(), r.PathValue("id"), body.Status) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, updated) +} + +func (s *Server) deleteClusterAccount(w http.ResponseWriter, r *http.Request) { + if err := s.svc.DeleteClusterAccount(r.Context(), r.PathValue("id")); err != nil { + writeServiceError(w, err) + return + } + w.WriteHeader(http.StatusNoContent) +} + +func (s *Server) listClusterAccountsForCluster(w http.ResponseWriter, r *http.Request) { + out, err := s.svc.ListClusterAccountsForCluster(r.Context(), r.PathValue("id")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, out) +} + +func (s *Server) getClusterAccountByClusterAndUsername(w http.ResponseWriter, r *http.Request) { + a, err := s.svc.GetClusterAccountByClusterAndUsername(r.Context(), r.PathValue("id"), r.PathValue("username")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, a) +} + +func (s *Server) listClusterAccountsForUser(w http.ResponseWriter, r *http.Request) { + out, err := s.svc.ListClusterAccountsForUser(r.Context(), r.PathValue("id")) + if err != nil { + writeServiceError(w, err) + return + } + writeJSON(w, http.StatusOK, out) +} + func (s *Server) createExternalIdentity(w http.ResponseWriter, r *http.Request) { var e models.ExternalIdentity if err := decodeJSON(r, &e); err != nil { @@ -762,7 +890,7 @@ func (s *Server) deleteExternalIdentity(w http.ResponseWriter, r *http.Request) writeServiceError(w, err) return } - writeJSON(w, http.StatusNoContent, nil) + w.WriteHeader(http.StatusNoContent) } func (s *Server) getExternalIdentityBySource(w http.ResponseWriter, r *http.Request) { @@ -821,7 +949,7 @@ func (s *Server) removeUserDN(w http.ResponseWriter, r *http.Request) { writeServiceError(w, err) return } - writeJSON(w, http.StatusNoContent, nil) + w.WriteHeader(http.StatusNoContent) } func (s *Server) listUserDNs(w http.ResponseWriter, r *http.Request) { diff --git a/internal/store/cluster_account_store.go b/internal/store/cluster_account_store.go new file mode 100644 index 000000000..06012421d --- /dev/null +++ b/internal/store/cluster_account_store.go @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package store + +import ( + "context" + "database/sql" + "errors" + + "github.com/jmoiron/sqlx" + + "github.com/apache/airavata-custos/pkg/models" +) + +type mysqlClusterAccountStore struct { + db *sqlx.DB +} + +// NewClusterAccountStore returns a MySQL-backed ClusterAccountStore. +func NewClusterAccountStore(db *sqlx.DB) ClusterAccountStore { + return &mysqlClusterAccountStore{db: db} +} + +const clusterAccountColumns = `id, user_id, compute_cluster_id, username, status` + +func (s *mysqlClusterAccountStore) FindByID(ctx context.Context, id string) (*models.ClusterAccount, error) { + var a models.ClusterAccount + err := s.db.GetContext(ctx, &a, + `SELECT `+clusterAccountColumns+` FROM cluster_accounts WHERE id = ?`, id) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &a, nil +} + +func (s *mysqlClusterAccountStore) FindByClusterAndUsername(ctx context.Context, clusterID, username string) (*models.ClusterAccount, error) { + var a models.ClusterAccount + err := s.db.GetContext(ctx, &a, + `SELECT `+clusterAccountColumns+` FROM cluster_accounts WHERE compute_cluster_id = ? AND username = ?`, + clusterID, username) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &a, nil +} + +func (s *mysqlClusterAccountStore) FindByUser(ctx context.Context, userID string) ([]models.ClusterAccount, error) { + var out []models.ClusterAccount + err := s.db.SelectContext(ctx, &out, + `SELECT `+clusterAccountColumns+` FROM cluster_accounts WHERE user_id = ? ORDER BY created_at ASC`, + userID) + if err != nil { + return nil, err + } + return out, nil +} + +func (s *mysqlClusterAccountStore) FindByCluster(ctx context.Context, clusterID string) ([]models.ClusterAccount, error) { + var out []models.ClusterAccount + err := s.db.SelectContext(ctx, &out, + `SELECT `+clusterAccountColumns+` FROM cluster_accounts WHERE compute_cluster_id = ? ORDER BY created_at ASC`, + clusterID) + if err != nil { + return nil, err + } + return out, nil +} + +func (s *mysqlClusterAccountStore) Create(ctx context.Context, tx *sql.Tx, a *models.ClusterAccount) error { + _, err := tx.ExecContext(ctx, + `INSERT INTO cluster_accounts (id, user_id, compute_cluster_id, username, status) + VALUES (?, ?, ?, ?, ?)`, + a.ID, a.UserID, a.ComputeClusterID, a.Username, a.Status) + return err +} + +func (s *mysqlClusterAccountStore) UpdateStatus(ctx context.Context, tx *sql.Tx, id string, status models.AllocationStatus) error { + _, err := tx.ExecContext(ctx, + `UPDATE cluster_accounts SET status = ? WHERE id = ?`, + status, id) + return err +} + +func (s *mysqlClusterAccountStore) ReassignUser(ctx context.Context, tx *sql.Tx, fromUserID, toUserID string) error { + // Drop fromUserID's accounts whose (cluster, username) the survivor already + // owns, then move the rest. + if _, err := tx.ExecContext(ctx, + `DELETE FROM cluster_accounts + WHERE user_id = ? + AND (compute_cluster_id, username) IN ( + SELECT compute_cluster_id, username FROM ( + SELECT compute_cluster_id, username FROM cluster_accounts WHERE user_id = ? + ) AS s + )`, + fromUserID, toUserID); err != nil { + return err + } + _, err := tx.ExecContext(ctx, + `UPDATE cluster_accounts SET user_id = ? WHERE user_id = ?`, + toUserID, fromUserID) + return err +} + +func (s *mysqlClusterAccountStore) Delete(ctx context.Context, tx *sql.Tx, id string) error { + _, err := tx.ExecContext(ctx, `DELETE FROM cluster_accounts WHERE id = ?`, id) + return err +} diff --git a/internal/store/compute_allocation_membership_store.go b/internal/store/compute_allocation_membership_store.go index aae350ac9..21010108c 100644 --- a/internal/store/compute_allocation_membership_store.go +++ b/internal/store/compute_allocation_membership_store.go @@ -116,6 +116,26 @@ func (s *mysqlComputeAllocationMembershipStore) Update(ctx context.Context, tx * return err } +func (s *mysqlComputeAllocationMembershipStore) ReassignUser(ctx context.Context, tx *sql.Tx, fromUserID, toUserID string) error { + // Drop fromUserID's memberships in allocations the survivor already belongs + // to, then move the rest. + if _, err := tx.ExecContext(ctx, + `DELETE FROM compute_allocation_memberships + WHERE user_id = ? + AND compute_allocation_id IN ( + SELECT compute_allocation_id FROM ( + SELECT compute_allocation_id FROM compute_allocation_memberships WHERE user_id = ? + ) AS s + )`, + fromUserID, toUserID); err != nil { + return err + } + _, err := tx.ExecContext(ctx, + `UPDATE compute_allocation_memberships SET user_id = ? WHERE user_id = ?`, + toUserID, fromUserID) + return err +} + func (s *mysqlComputeAllocationMembershipStore) Delete(ctx context.Context, tx *sql.Tx, id string) error { _, err := tx.ExecContext(ctx, `DELETE FROM compute_allocation_memberships WHERE id = ?`, id) return err diff --git a/internal/store/external_identity_store.go b/internal/store/external_identity_store.go index 41f9e099c..2b3bac182 100644 --- a/internal/store/external_identity_store.go +++ b/internal/store/external_identity_store.go @@ -106,6 +106,13 @@ func (s *mysqlExternalIdentityStore) Update(ctx context.Context, tx *sql.Tx, e * return err } +func (s *mysqlExternalIdentityStore) ReassignUser(ctx context.Context, tx *sql.Tx, fromUserID, toUserID string) error { + _, err := tx.ExecContext(ctx, + `UPDATE external_identities SET user_id = ? WHERE user_id = ?`, + toUserID, fromUserID) + return err +} + func (s *mysqlExternalIdentityStore) Delete(ctx context.Context, tx *sql.Tx, id string) error { _, err := tx.ExecContext(ctx, `DELETE FROM external_identities WHERE id = ?`, id) return err diff --git a/internal/store/project_store.go b/internal/store/project_store.go index fc447500a..226836f35 100644 --- a/internal/store/project_store.go +++ b/internal/store/project_store.go @@ -39,7 +39,7 @@ func NewProjectStore(db *sqlx.DB) ProjectStore { func (s *mysqlProjectStore) FindByID(ctx context.Context, id string) (*models.Project, error) { var p models.Project err := s.db.GetContext(ctx, &p, - `SELECT id, originated_id, title, origination, project_pi_id, created_time + `SELECT id, originated_id, title, origination, project_pi_id, status, created_time FROM projects WHERE id = ?`, id) if err != nil { if errors.Is(err, sql.ErrNoRows) { @@ -53,7 +53,7 @@ func (s *mysqlProjectStore) FindByID(ctx context.Context, id string) (*models.Pr func (s *mysqlProjectStore) FindByOriginatedID(ctx context.Context, originatedID string) (*models.Project, error) { var p models.Project err := s.db.GetContext(ctx, &p, - `SELECT id, originated_id, title, origination, project_pi_id, created_time + `SELECT id, originated_id, title, origination, project_pi_id, status, created_time FROM projects WHERE originated_id = ?`, originatedID) if err != nil { if errors.Is(err, sql.ErrNoRows) { @@ -67,7 +67,7 @@ func (s *mysqlProjectStore) FindByOriginatedID(ctx context.Context, originatedID func (s *mysqlProjectStore) FindByPI(ctx context.Context, piUserID string) ([]models.Project, error) { var projects []models.Project err := s.db.SelectContext(ctx, &projects, - `SELECT id, originated_id, title, origination, project_pi_id, created_time + `SELECT id, originated_id, title, origination, project_pi_id, status, created_time FROM projects WHERE project_pi_id = ?`, piUserID) if err != nil { return nil, err @@ -77,17 +77,31 @@ func (s *mysqlProjectStore) FindByPI(ctx context.Context, piUserID string) ([]mo func (s *mysqlProjectStore) Create(ctx context.Context, tx *sql.Tx, p *models.Project) error { _, err := tx.ExecContext(ctx, - `INSERT INTO projects (id, originated_id, title, origination, project_pi_id, created_time) - VALUES (?, ?, ?, ?, ?, ?)`, - p.ID, p.OriginatedID, p.Title, p.Origination, p.ProjectPIID, p.CreatedTime) + `INSERT INTO projects (id, originated_id, title, origination, project_pi_id, status, created_time) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + p.ID, p.OriginatedID, p.Title, p.Origination, p.ProjectPIID, p.Status, p.CreatedTime) return err } func (s *mysqlProjectStore) Update(ctx context.Context, tx *sql.Tx, p *models.Project) error { _, err := tx.ExecContext(ctx, - `UPDATE projects SET originated_id = ?, title = ?, origination = ?, project_pi_id = ? + `UPDATE projects SET originated_id = ?, title = ?, origination = ?, project_pi_id = ?, status = ? WHERE id = ?`, - p.OriginatedID, p.Title, p.Origination, p.ProjectPIID, p.ID) + p.OriginatedID, p.Title, p.Origination, p.ProjectPIID, p.Status, p.ID) + return err +} + +func (s *mysqlProjectStore) UpdateStatus(ctx context.Context, tx *sql.Tx, id string, status models.AllocationStatus) error { + _, err := tx.ExecContext(ctx, + `UPDATE projects SET status = ? WHERE id = ?`, + status, id) + return err +} + +func (s *mysqlProjectStore) ReassignPI(ctx context.Context, tx *sql.Tx, fromUserID, toUserID string) error { + _, err := tx.ExecContext(ctx, + `UPDATE projects SET project_pi_id = ? WHERE project_pi_id = ?`, + toUserID, fromUserID) return err } diff --git a/internal/store/store.go b/internal/store/store.go index 2ba3caced..355788a99 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -35,12 +35,31 @@ type UserStore interface { FindByOrganization(ctx context.Context, organizationID string) ([]models.User, error) // Create inserts a new user within the provided transaction. Create(ctx context.Context, tx *sql.Tx, u *models.User) error - // Update replaces mutable fields of an existing user within the provided transaction. + // Update replaces mutable fields of an existing user within the provided + // transaction. Does NOT touch status — route status changes through + // UpdateStatus. Update(ctx context.Context, tx *sql.Tx, u *models.User) error - // Delete removes a user by ID within the provided transaction. + // UpdateStatus flips only the lifecycle status (ACTIVE / INACTIVE / MERGED). + UpdateStatus(ctx context.Context, tx *sql.Tx, id string, status models.UserStatus) error + // Delete removes a user by ID within the provided transaction. This is the + // explicit hard-delete. The user-merge flow uses UpdateStatus(MERGED) and a + // row in user_merges instead. Delete(ctx context.Context, tx *sql.Tx, id string) error } +// UserMergeStore defines persistence operations for the user_merges audit +// table, which records the linkage produced by Service.MergeUsers. +type UserMergeStore interface { + // Record inserts a merge linkage. Reason may be empty. + Record(ctx context.Context, tx *sql.Tx, retiringUserID, survivingUserID, reason string) error + // FindByRetiringUser returns the single merge row whose retiring user + // matches, or nil if absent. + FindByRetiringUser(ctx context.Context, retiringUserID string) (*models.UserMerge, error) + // FindBySurvivingUser returns every merge row whose surviving user matches, + // ordered by merged_at ascending. + FindBySurvivingUser(ctx context.Context, survivingUserID string) ([]models.UserMerge, error) +} + // OrganizationStore defines persistence operations for organizations. type OrganizationStore interface { // FindByID returns the organization with the given ID, or nil if not found. @@ -83,6 +102,11 @@ type ProjectStore interface { Create(ctx context.Context, tx *sql.Tx, p *models.Project) error // Update replaces mutable fields of an existing project within the provided transaction. Update(ctx context.Context, tx *sql.Tx, p *models.Project) error + // UpdateStatus updates only the status field of an existing project. + UpdateStatus(ctx context.Context, tx *sql.Tx, id string, status models.AllocationStatus) error + // ReassignPI re-points every project's project_pi_id from fromUserID to + // toUserID. + ReassignPI(ctx context.Context, tx *sql.Tx, fromUserID, toUserID string) error // Delete removes a project by ID within the provided transaction. Delete(ctx context.Context, tx *sql.Tx, id string) error } @@ -227,10 +251,36 @@ type ComputeAllocationMembershipStore interface { Create(ctx context.Context, tx *sql.Tx, m *models.ComputeAllocationMembership) error // Update replaces mutable fields of an existing membership within the provided transaction. Update(ctx context.Context, tx *sql.Tx, m *models.ComputeAllocationMembership) error + // ReassignUser re-points every membership's user_id from fromUserID to + // toUserID. + ReassignUser(ctx context.Context, tx *sql.Tx, fromUserID, toUserID string) error // Delete removes a membership by ID within the provided transaction. Delete(ctx context.Context, tx *sql.Tx, id string) error } +// ClusterAccountStore defines persistence operations for posix-style +// accounts provisioned for a user on a specific compute cluster. +// (compute_cluster_id, username) is unique. +type ClusterAccountStore interface { + // FindByID returns the cluster account with the given ID, or nil if absent. + FindByID(ctx context.Context, id string) (*models.ClusterAccount, error) + // FindByClusterAndUsername returns the account for a (cluster, username) + // pair, or nil if absent. + FindByClusterAndUsername(ctx context.Context, clusterID, username string) (*models.ClusterAccount, error) + // FindByUser returns every cluster account belonging to the given user. + FindByUser(ctx context.Context, userID string) ([]models.ClusterAccount, error) + // FindByCluster returns every cluster account on the given cluster. + FindByCluster(ctx context.Context, clusterID string) ([]models.ClusterAccount, error) + // Create inserts a new cluster account within the provided transaction. + Create(ctx context.Context, tx *sql.Tx, a *models.ClusterAccount) error + // UpdateStatus updates only the status field. + UpdateStatus(ctx context.Context, tx *sql.Tx, id string, status models.AllocationStatus) error + // ReassignUser re-points every account from fromUserID to toUserID. + ReassignUser(ctx context.Context, tx *sql.Tx, fromUserID, toUserID string) error + // Delete removes a cluster account by ID within the provided transaction. + Delete(ctx context.Context, tx *sql.Tx, id string) error +} + // ExternalIdentityStore defines persistence operations for the mapping // between users and their identifiers in external systems. // UNIQUE (source, external_id) is enforced at the schema level. @@ -251,6 +301,9 @@ type ExternalIdentityStore interface { // Update replaces mutable fields of an existing external identity within // the provided transaction. Update(ctx context.Context, tx *sql.Tx, e *models.ExternalIdentity) error + // ReassignUser re-points every external identity from fromUserID to + // toUserID. + ReassignUser(ctx context.Context, tx *sql.Tx, fromUserID, toUserID string) error // Delete removes an external identity by ID within the provided transaction. Delete(ctx context.Context, tx *sql.Tx, id string) error } @@ -268,6 +321,8 @@ type UserDNStore interface { FindByUser(ctx context.Context, userID string) ([]models.UserDN, error) // Create inserts a new DN binding within the provided transaction. Create(ctx context.Context, tx *sql.Tx, d *models.UserDN) error + // ReassignUser re-points every DN binding from fromUserID to toUserID. + ReassignUser(ctx context.Context, tx *sql.Tx, fromUserID, toUserID string) error // Delete removes a DN binding by ID within the provided transaction. Delete(ctx context.Context, tx *sql.Tx, id string) error } diff --git a/internal/store/user_dn_store.go b/internal/store/user_dn_store.go index 656413572..bb0a73741 100644 --- a/internal/store/user_dn_store.go +++ b/internal/store/user_dn_store.go @@ -82,6 +82,21 @@ func (s *mysqlUserDNStore) Create(ctx context.Context, tx *sql.Tx, d *models.Use return err } +func (s *mysqlUserDNStore) ReassignUser(ctx context.Context, tx *sql.Tx, fromUserID, toUserID string) error { + // Drop fromUserID's rows whose dn the survivor already holds, then move the rest. + if _, err := tx.ExecContext(ctx, + `DELETE FROM user_dns + WHERE user_id = ? + AND dn IN (SELECT dn FROM (SELECT dn FROM user_dns WHERE user_id = ?) AS s)`, + fromUserID, toUserID); err != nil { + return err + } + _, err := tx.ExecContext(ctx, + `UPDATE user_dns SET user_id = ? WHERE user_id = ?`, + toUserID, fromUserID) + return err +} + func (s *mysqlUserDNStore) Delete(ctx context.Context, tx *sql.Tx, id string) error { _, err := tx.ExecContext(ctx, `DELETE FROM user_dns WHERE id = ?`, id) return err diff --git a/internal/store/user_merge_store.go b/internal/store/user_merge_store.go new file mode 100644 index 000000000..827462e55 --- /dev/null +++ b/internal/store/user_merge_store.go @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package store + +import ( + "context" + "database/sql" + "errors" + + "github.com/jmoiron/sqlx" + + "github.com/apache/airavata-custos/pkg/models" +) + +type mysqlUserMergeStore struct { + db *sqlx.DB +} + +// NewUserMergeStore returns a MySQL-backed UserMergeStore. +func NewUserMergeStore(db *sqlx.DB) UserMergeStore { + return &mysqlUserMergeStore{db: db} +} + +const userMergeColumns = `id, retiring_user_id, surviving_user_id, COALESCE(reason, '') AS reason, merged_at` + +func (s *mysqlUserMergeStore) Record(ctx context.Context, tx *sql.Tx, retiringUserID, survivingUserID, reason string) error { + var reasonArg any + if reason == "" { + reasonArg = nil + } else { + reasonArg = reason + } + _, err := tx.ExecContext(ctx, + `INSERT INTO user_merges (retiring_user_id, surviving_user_id, reason) + VALUES (?, ?, ?)`, + retiringUserID, survivingUserID, reasonArg) + return err +} + +func (s *mysqlUserMergeStore) FindByRetiringUser(ctx context.Context, retiringUserID string) (*models.UserMerge, error) { + var m models.UserMerge + err := s.db.GetContext(ctx, &m, + `SELECT `+userMergeColumns+` FROM user_merges WHERE retiring_user_id = ?`, retiringUserID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &m, nil +} + +func (s *mysqlUserMergeStore) FindBySurvivingUser(ctx context.Context, survivingUserID string) ([]models.UserMerge, error) { + var out []models.UserMerge + err := s.db.SelectContext(ctx, &out, + `SELECT `+userMergeColumns+` FROM user_merges WHERE surviving_user_id = ? ORDER BY merged_at ASC`, + survivingUserID) + if err != nil { + return nil, err + } + return out, nil +} diff --git a/internal/store/user_store.go b/internal/store/user_store.go index eda2b749f..30659283b 100644 --- a/internal/store/user_store.go +++ b/internal/store/user_store.go @@ -27,6 +27,8 @@ import ( "github.com/apache/airavata-custos/pkg/models" ) +const userColumns = `id, organization_id, first_name, last_name, middle_name, email, status` + type mysqlUserStore struct { db *sqlx.DB } @@ -39,8 +41,7 @@ func NewUserStore(db *sqlx.DB) UserStore { func (s *mysqlUserStore) FindByID(ctx context.Context, id string) (*models.User, error) { var u models.User err := s.db.GetContext(ctx, &u, - `SELECT id, organization_id, first_name, last_name, middle_name, email - FROM users WHERE id = ?`, id) + `SELECT `+userColumns+` FROM users WHERE id = ?`, id) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, nil @@ -53,8 +54,7 @@ func (s *mysqlUserStore) FindByID(ctx context.Context, id string) (*models.User, func (s *mysqlUserStore) FindByEmail(ctx context.Context, email string) (*models.User, error) { var u models.User err := s.db.GetContext(ctx, &u, - `SELECT id, organization_id, first_name, last_name, middle_name, email - FROM users WHERE email = ?`, email) + `SELECT `+userColumns+` FROM users WHERE email = ?`, email) if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, nil @@ -67,8 +67,7 @@ func (s *mysqlUserStore) FindByEmail(ctx context.Context, email string) (*models func (s *mysqlUserStore) FindByOrganization(ctx context.Context, organizationID string) ([]models.User, error) { var users []models.User err := s.db.SelectContext(ctx, &users, - `SELECT id, organization_id, first_name, last_name, middle_name, email - FROM users WHERE organization_id = ?`, organizationID) + `SELECT `+userColumns+` FROM users WHERE organization_id = ?`, organizationID) if err != nil { return nil, err } @@ -77,9 +76,9 @@ func (s *mysqlUserStore) FindByOrganization(ctx context.Context, organizationID func (s *mysqlUserStore) Create(ctx context.Context, tx *sql.Tx, u *models.User) error { _, err := tx.ExecContext(ctx, - `INSERT INTO users (id, organization_id, first_name, last_name, middle_name, email) - VALUES (?, ?, ?, ?, ?, ?)`, - u.ID, u.OrganizationID, u.FirstName, u.LastName, u.MiddleName, u.Email) + `INSERT INTO users (id, organization_id, first_name, last_name, middle_name, email, status) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + u.ID, u.OrganizationID, u.FirstName, u.LastName, u.MiddleName, u.Email, u.Status) return err } @@ -91,6 +90,13 @@ func (s *mysqlUserStore) Update(ctx context.Context, tx *sql.Tx, u *models.User) return err } +func (s *mysqlUserStore) UpdateStatus(ctx context.Context, tx *sql.Tx, id string, status models.UserStatus) error { + _, err := tx.ExecContext(ctx, + `UPDATE users SET status = ? WHERE id = ?`, + status, id) + return err +} + func (s *mysqlUserStore) Delete(ctx context.Context, tx *sql.Tx, id string) error { _, err := tx.ExecContext(ctx, `DELETE FROM users WHERE id = ?`, id) return err diff --git a/pkg/events/cluster_account_subscribe.go b/pkg/events/cluster_account_subscribe.go new file mode 100644 index 000000000..cdb26f97e --- /dev/null +++ b/pkg/events/cluster_account_subscribe.go @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package events + +import ( + "log/slog" + + "github.com/apache/airavata-custos/pkg/models" +) + +// ClusterAccountHandler handles cluster account lifecycle events with a typed payload. +type ClusterAccountHandler func(a models.ClusterAccount) + +// SubscribeClusterAccountCreated registers a typed handler invoked whenever a +// cluster_account::create event is published. +func (b *Bus) SubscribeClusterAccountCreated(handler ClusterAccountHandler) { + b.subscribeClusterAccount(ClusterAccountCreateEvent, handler) +} + +// SubscribeClusterAccountUpdated registers a typed handler invoked whenever a +// cluster_account::update event is published. +func (b *Bus) SubscribeClusterAccountUpdated(handler ClusterAccountHandler) { + b.subscribeClusterAccount(ClusterAccountUpdateEvent, handler) +} + +// SubscribeClusterAccountDeleted registers a typed handler invoked whenever a +// cluster_account::delete event is published. +func (b *Bus) SubscribeClusterAccountDeleted(handler ClusterAccountHandler) { + b.subscribeClusterAccount(ClusterAccountDeleteEvent, handler) +} + +func (b *Bus) subscribeClusterAccount(topic EventType, handler ClusterAccountHandler) { + b.Subscribe(topic, func(event Event, value interface{}) { + switch a := value.(type) { + case models.ClusterAccount: + handler(a) + case *models.ClusterAccount: + if a != nil { + handler(*a) + } + default: + slog.Warn("cluster account event payload has unexpected type", + "type", event.Type, + "got", value, + ) + } + }) +} diff --git a/pkg/events/external_identity_subscribe.go b/pkg/events/external_identity_subscribe.go index 27fd2f498..83c53aa38 100644 --- a/pkg/events/external_identity_subscribe.go +++ b/pkg/events/external_identity_subscribe.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package events import ( diff --git a/pkg/events/user_dn_subscribe.go b/pkg/events/user_dn_subscribe.go index 67b14fd33..ba6f7723b 100644 --- a/pkg/events/user_dn_subscribe.go +++ b/pkg/events/user_dn_subscribe.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package events import ( diff --git a/pkg/models/allocation.go b/pkg/models/allocation.go index 1037b1d7f..3b26f615c 100644 --- a/pkg/models/allocation.go +++ b/pkg/models/allocation.go @@ -15,6 +15,17 @@ type ComputeCluster struct { Name string `json:"name" db:"name"` // A human-readable name for the compute cluster, e.g., "Cluster A", "Cluster B", etc. } +// ClusterAccount represents the provisioned posix account a user holds on a +// specific compute cluster (e.g. username "alice123" on cluster "abc"). +// The (compute_cluster_id, username) pair is unique. +type ClusterAccount struct { + ID string `json:"id" db:"id"` + UserID string `json:"user_id" db:"user_id"` + ComputeClusterID string `json:"compute_cluster_id" db:"compute_cluster_id"` + Username string `json:"username" db:"username"` + Status AllocationStatus `json:"status" db:"status"` // ACTIVE, INACTIVE, DELETED +} + type ComputeAllocation struct { ID string `json:"id" db:"id"` ProjectID string `json:"project_id" db:"project_id"` diff --git a/pkg/models/project.go b/pkg/models/project.go index d6325959d..879f57841 100644 --- a/pkg/models/project.go +++ b/pkg/models/project.go @@ -3,12 +3,13 @@ package models import "time" type Project struct { - ID string `json:"id" db:"id"` - OriginatedID string `json:"originated_id" db:"originated_id"` // The ID of the project in origination. For example: ACCESS Record ID. - Title string `json:"title" db:"title"` - Origination string `json:"origination" db:"origination"` // ACCESS, NAIRR, XRASS, etc. - ProjectPIID string `json:"project_pi_id" db:"project_pi_id"` - CreatedTime time.Time `json:"created_time" db:"created_time"` + ID string `json:"id" db:"id"` + OriginatedID string `json:"originated_id" db:"originated_id"` // The ID of the project in origination. For example: ACCESS Record ID. + Title string `json:"title" db:"title"` + Origination string `json:"origination" db:"origination"` // ACCESS, NAIRR, XRASS, etc. + ProjectPIID string `json:"project_pi_id" db:"project_pi_id"` + Status AllocationStatus `json:"status" db:"status"` // ACTIVE, INACTIVE, DELETED + CreatedTime time.Time `json:"created_time" db:"created_time"` } type Organization struct { @@ -17,11 +18,34 @@ type Organization struct { Name string `json:"name" db:"name"` } +type UserStatus string + +const ( + UserActive UserStatus = "ACTIVE" + UserInactive UserStatus = "INACTIVE" + // UserMerged marks the retiring user after a Service.MergeUsers call. + // The row is kept as a mapping so historical references stay resolvable, the + // linkage to the surviving user lives in the user_merges table. + UserMerged UserStatus = "MERGED" +) + type User struct { - ID string `json:"id" db:"id"` - OrganizationID string `json:"organization_id" db:"organization_id"` - FirstName string `json:"first_name" db:"first_name"` - LastName string `json:"last_name" db:"last_name"` - MiddleName string `json:"middle_name,omitempty" db:"middle_name"` - Email string `json:"email" db:"email"` + ID string `json:"id" db:"id"` + OrganizationID string `json:"organization_id" db:"organization_id"` + FirstName string `json:"first_name" db:"first_name"` + LastName string `json:"last_name" db:"last_name"` + MiddleName string `json:"middle_name,omitempty" db:"middle_name"` + Email string `json:"email" db:"email"` + Status UserStatus `json:"status" db:"status"` +} + +// UserMerge records the linkage produced by Service.MergeUsers. The retiring +// user's row is kept (with status=MERGED), so historical references remain +// resolvable. +type UserMerge struct { + ID int64 `json:"id" db:"id"` + RetiringUserID string `json:"retiring_user_id" db:"retiring_user_id"` + SurvivingUserID string `json:"surviving_user_id" db:"surviving_user_id"` + Reason string `json:"reason,omitempty" db:"reason"` + MergedAt time.Time `json:"merged_at" db:"merged_at"` } diff --git a/pkg/service/cluster_account.go b/pkg/service/cluster_account.go new file mode 100644 index 000000000..a19bad270 --- /dev/null +++ b/pkg/service/cluster_account.go @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package service + +import ( + "context" + "database/sql" + "fmt" + + "github.com/apache/airavata-custos/pkg/events" + "github.com/apache/airavata-custos/pkg/models" +) + +// CreateClusterAccount persists a new cluster account for a user on a cluster. +// If account.ID is empty, a new UUID is generated. The referenced user and +// cluster must exist; (compute_cluster_id, username) is unique. +func (s *Service) CreateClusterAccount(ctx context.Context, account *models.ClusterAccount) (*models.ClusterAccount, error) { + if account == nil { + return nil, fmt.Errorf("%w: cluster account is nil", ErrInvalidInput) + } + if account.UserID == "" { + return nil, fmt.Errorf("%w: cluster account user_id is required", ErrInvalidInput) + } + if account.ComputeClusterID == "" { + return nil, fmt.Errorf("%w: cluster account compute_cluster_id is required", ErrInvalidInput) + } + if account.Username == "" { + return nil, fmt.Errorf("%w: cluster account username is required", ErrInvalidInput) + } + + if user, err := s.users.FindByID(ctx, account.UserID); err != nil { + return nil, fmt.Errorf("verify user: %w", err) + } else if user == nil { + return nil, fmt.Errorf("%w: user %q does not exist", ErrInvalidInput, account.UserID) + } + if cluster, err := s.clusters.FindByID(ctx, account.ComputeClusterID); err != nil { + return nil, fmt.Errorf("verify compute cluster: %w", err) + } else if cluster == nil { + return nil, fmt.Errorf("%w: compute cluster %q does not exist", ErrInvalidInput, account.ComputeClusterID) + } + + if existing, err := s.clusterAccounts.FindByClusterAndUsername(ctx, account.ComputeClusterID, account.Username); err != nil { + return nil, fmt.Errorf("lookup cluster account: %w", err) + } else if existing != nil { + return nil, fmt.Errorf("%w: cluster account %q on cluster %q", ErrAlreadyExists, account.Username, account.ComputeClusterID) + } + + if account.ID == "" { + account.ID = newID() + } + if account.Status == "" { + account.Status = models.ACTIVE + } + + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.clusterAccounts.Create(ctx, tx, account) + }); err != nil { + return nil, fmt.Errorf("create cluster account: %w", err) + } + + s.eventBus.Publish(events.ClusterAccountCreateEvent, account) + return account, nil +} + +// GetClusterAccount retrieves a cluster account by ID. +func (s *Service) GetClusterAccount(ctx context.Context, id string) (*models.ClusterAccount, error) { + a, err := s.clusterAccounts.FindByID(ctx, id) + if err != nil { + return nil, fmt.Errorf("get cluster account: %w", err) + } + if a == nil { + return nil, ErrNotFound + } + return a, nil +} + +// GetClusterAccountByClusterAndUsername resolves a cluster account by its +// natural key. +func (s *Service) GetClusterAccountByClusterAndUsername(ctx context.Context, clusterID, username string) (*models.ClusterAccount, error) { + a, err := s.clusterAccounts.FindByClusterAndUsername(ctx, clusterID, username) + if err != nil { + return nil, fmt.Errorf("get cluster account by cluster/username: %w", err) + } + if a == nil { + return nil, ErrNotFound + } + return a, nil +} + +// ListClusterAccountsForUser returns every cluster account belonging to a user. +func (s *Service) ListClusterAccountsForUser(ctx context.Context, userID string) ([]models.ClusterAccount, error) { + out, err := s.clusterAccounts.FindByUser(ctx, userID) + if err != nil { + return nil, fmt.Errorf("list cluster accounts by user: %w", err) + } + return out, nil +} + +// ListClusterAccountsForCluster returns every cluster account on a cluster. +func (s *Service) ListClusterAccountsForCluster(ctx context.Context, clusterID string) ([]models.ClusterAccount, error) { + out, err := s.clusterAccounts.FindByCluster(ctx, clusterID) + if err != nil { + return nil, fmt.Errorf("list cluster accounts by cluster: %w", err) + } + return out, nil +} + +// UpdateClusterAccountStatus flips only the lifecycle status. +func (s *Service) UpdateClusterAccountStatus(ctx context.Context, id string, status models.AllocationStatus) (*models.ClusterAccount, error) { + if id == "" { + return nil, fmt.Errorf("%w: cluster account id is required", ErrInvalidInput) + } + switch status { + case models.ACTIVE, models.INACTIVE, models.DELETED: + default: + return nil, fmt.Errorf("%w: invalid cluster account status %q", ErrInvalidInput, status) + } + + a, err := s.clusterAccounts.FindByID(ctx, id) + if err != nil { + return nil, fmt.Errorf("lookup cluster account: %w", err) + } + if a == nil { + return nil, ErrNotFound + } + if a.Status == status { + return a, nil + } + + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.clusterAccounts.UpdateStatus(ctx, tx, id, status) + }); err != nil { + return nil, fmt.Errorf("update cluster account status: %w", err) + } + a.Status = status + + s.eventBus.Publish(events.ClusterAccountUpdateEvent, a) + return a, nil +} + +// DeleteClusterAccount removes a cluster account by ID. +func (s *Service) DeleteClusterAccount(ctx context.Context, id string) error { + if id == "" { + return fmt.Errorf("%w: cluster account id is required", ErrInvalidInput) + } + a, err := s.clusterAccounts.FindByID(ctx, id) + if err != nil { + return fmt.Errorf("lookup cluster account: %w", err) + } + if a == nil { + return ErrNotFound + } + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.clusterAccounts.Delete(ctx, tx, id) + }); err != nil { + return fmt.Errorf("delete cluster account: %w", err) + } + + s.eventBus.Publish(events.ClusterAccountDeleteEvent, a) + return nil +} diff --git a/pkg/service/project.go b/pkg/service/project.go index 938ff4f8e..c08150a2c 100644 --- a/pkg/service/project.go +++ b/pkg/service/project.go @@ -56,6 +56,9 @@ func (s *Service) CreateProject(ctx context.Context, project *models.Project) (* if project.ID == "" { project.ID = newID() } + if project.Status == "" { + project.Status = models.ACTIVE + } if project.CreatedTime.IsZero() { project.CreatedTime = nowUTC() } @@ -118,6 +121,39 @@ func (s *Service) UpdateProject(ctx context.Context, project *models.Project) er return nil } +// UpdateProjectStatus flips only the project's lifecycle status. +func (s *Service) UpdateProjectStatus(ctx context.Context, id string, status models.AllocationStatus) (*models.Project, error) { + if id == "" { + return nil, fmt.Errorf("%w: project id is required", ErrInvalidInput) + } + switch status { + case models.ACTIVE, models.INACTIVE, models.DELETED: + default: + return nil, fmt.Errorf("%w: invalid project status %q", ErrInvalidInput, status) + } + + project, err := s.projs.FindByID(ctx, id) + if err != nil { + return nil, fmt.Errorf("lookup project: %w", err) + } + if project == nil { + return nil, ErrNotFound + } + if project.Status == status { + return project, nil + } + + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.projs.UpdateStatus(ctx, tx, id, status) + }); err != nil { + return nil, fmt.Errorf("update project status: %w", err) + } + project.Status = status + + s.eventBus.Publish(events.ProjectUpdateEvent, project) + return project, nil +} + // DeleteProject removes a project by ID. func (s *Service) DeleteProject(ctx context.Context, id string) error { if id == "" { diff --git a/pkg/service/service.go b/pkg/service/service.go index aa9538a52..d361d4af2 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -51,6 +51,8 @@ type Service struct { usages store.ComputeAllocationUsageStore extIDs store.ExternalIdentityStore userDNs store.UserDNStore + clusterAccounts store.ClusterAccountStore + userMerges store.UserMergeStore } // New constructs a Service backed by the supplied database handle. @@ -74,6 +76,8 @@ func New(database *sqlx.DB, eventBus *events.Bus) *Service { usages: store.NewComputeAllocationUsageStore(database), extIDs: store.NewExternalIdentityStore(database), userDNs: store.NewUserDNStore(database), + clusterAccounts: store.NewClusterAccountStore(database), + userMerges: store.NewUserMergeStore(database), } } @@ -98,6 +102,8 @@ func NewWithStores( usages store.ComputeAllocationUsageStore, extIDs store.ExternalIdentityStore, userDNs store.UserDNStore, + clusterAccounts store.ClusterAccountStore, + userMerges store.UserMergeStore, ) *Service { return &Service{ db: database, @@ -117,6 +123,8 @@ func NewWithStores( usages: usages, extIDs: extIDs, userDNs: userDNs, + clusterAccounts: clusterAccounts, + userMerges: userMerges, } } diff --git a/pkg/service/user.go b/pkg/service/user.go index da809881c..d7371979b 100644 --- a/pkg/service/user.go +++ b/pkg/service/user.go @@ -53,6 +53,9 @@ func (s *Service) CreateUser(ctx context.Context, user *models.User) (*models.Us if user.ID == "" { user.ID = newID() } + if user.Status == "" { + user.Status = models.UserActive + } if err := s.inTx(ctx, func(tx *sql.Tx) error { return s.users.Create(ctx, tx, user) @@ -74,6 +77,27 @@ func (s *Service) GetUser(ctx context.Context, id string) (*models.User, error) return u, nil } +// GetUserByExternalIdentity resolves a user via their (source, external_id) +// entry. Returns ErrNotFound when either the external identity does not +// exist or the user it points to has been deleted. +func (s *Service) GetUserByExternalIdentity(ctx context.Context, source, externalID string) (*models.User, error) { + ext, err := s.extIDs.FindBySourceAndExternalID(ctx, source, externalID) + if err != nil { + return nil, fmt.Errorf("lookup external identity: %w", err) + } + if ext == nil { + return nil, ErrNotFound + } + u, err := s.users.FindByID(ctx, ext.UserID) + if err != nil { + return nil, fmt.Errorf("lookup user: %w", err) + } + if u == nil { + return nil, ErrNotFound + } + return u, nil +} + // GetUserByEmail retrieves a user by email. func (s *Service) GetUserByEmail(ctx context.Context, email string) (*models.User, error) { u, err := s.users.FindByEmail(ctx, email) diff --git a/pkg/service/user_merge.go b/pkg/service/user_merge.go new file mode 100644 index 000000000..661bc6d34 --- /dev/null +++ b/pkg/service/user_merge.go @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package service + +import ( + "context" + "database/sql" + "fmt" + + "github.com/apache/airavata-custos/pkg/events" + "github.com/apache/airavata-custos/pkg/models" +) + +// MergeUsers consolidates the retiring user into the surviving user. All +// identity-forward state moves to the survivor. Historical state stays with no change. +// +// Moved to survivor (duplicates on the retiring user are +// dropped first, then the remainder is re-pointed): +// - external_identities +// - user_dns +// - cluster_accounts +// - projects.project_pi_id +// - compute_allocation_memberships +// +// Left in place (historical truth — who actually did the thing): +// - compute_allocation_change_requests (requester / approver) +// - compute_allocation_usages +// +// The retiring user row is flipped to status=MERGED (soft-delete) and a row +// is written to user_merges with the surviving user and the given reason. +// Historical references to the retiring user remain +// resolvable. The linkage lives in user_merges table. +// All work happens in a single transaction. +func (s *Service) MergeUsers(ctx context.Context, survivingID, retiringID, reason string) (*models.User, error) { + if survivingID == "" || retiringID == "" { + return nil, fmt.Errorf("%w: surviving and retiring user IDs are required", ErrInvalidInput) + } + if survivingID == retiringID { + return nil, fmt.Errorf("%w: cannot merge a user with itself", ErrInvalidInput) + } + + survivor, err := s.users.FindByID(ctx, survivingID) + if err != nil { + return nil, fmt.Errorf("lookup surviving user: %w", err) + } + if survivor == nil { + return nil, fmt.Errorf("%w: surviving user %q does not exist", ErrInvalidInput, survivingID) + } + retiring, err := s.users.FindByID(ctx, retiringID) + if err != nil { + return nil, fmt.Errorf("lookup retiring user: %w", err) + } + if retiring == nil { + return nil, fmt.Errorf("%w: retiring user %q does not exist", ErrInvalidInput, retiringID) + } + + if err := s.inTx(ctx, func(tx *sql.Tx) error { + if err := s.extIDs.ReassignUser(ctx, tx, retiringID, survivingID); err != nil { + return fmt.Errorf("reassign external identities: %w", err) + } + if err := s.userDNs.ReassignUser(ctx, tx, retiringID, survivingID); err != nil { + return fmt.Errorf("reassign user dns: %w", err) + } + if err := s.clusterAccounts.ReassignUser(ctx, tx, retiringID, survivingID); err != nil { + return fmt.Errorf("reassign cluster accounts: %w", err) + } + if err := s.projs.ReassignPI(ctx, tx, retiringID, survivingID); err != nil { + return fmt.Errorf("reassign project PI: %w", err) + } + if err := s.memberships.ReassignUser(ctx, tx, retiringID, survivingID); err != nil { + return fmt.Errorf("reassign memberships: %w", err) + } + if err := s.users.UpdateStatus(ctx, tx, retiringID, models.UserMerged); err != nil { + return fmt.Errorf("mark retiring user merged: %w", err) + } + return s.userMerges.Record(ctx, tx, retiringID, survivingID, reason) + }); err != nil { + return nil, fmt.Errorf("merge users: %w", err) + } + + retiring.Status = models.UserMerged + s.eventBus.Publish(events.UserUpdateEvent, retiring) + s.eventBus.Publish(events.UserUpdateEvent, survivor) + return survivor, nil +}
