This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git

commit 50f03d8eeae366e018308576415c661dbc0e5dd8
Author: lahiruj <[email protected]>
AuthorDate: Tue May 19 02:11:37 2026 -0400

    Add Project.Status, ClusterAccount, UserMerge, and identity helpers to core
---
 .../db/migrations/000011_projects_status.down.sql  |  20 +++
 .../db/migrations/000011_projects_status.up.sql    |  20 +++
 .../db/migrations/000012_cluster_accounts.down.sql |  18 +++
 .../db/migrations/000012_cluster_accounts.up.sql   |  34 ++++
 .../db/migrations/000013_users_status.down.sql     |  20 +++
 internal/db/migrations/000013_users_status.up.sql  |  20 +++
 internal/db/migrations/000014_user_merges.down.sql |  18 +++
 internal/db/migrations/000014_user_merges.up.sql   |  30 ++++
 internal/server/server.go                          | 138 +++++++++++++++-
 internal/store/cluster_account_store.go            | 128 +++++++++++++++
 .../store/compute_allocation_membership_store.go   |  20 +++
 internal/store/external_identity_store.go          |   7 +
 internal/store/project_store.go                    |  30 +++-
 internal/store/store.go                            |  59 ++++++-
 internal/store/user_dn_store.go                    |  15 ++
 internal/store/user_merge_store.go                 |  77 +++++++++
 internal/store/user_store.go                       |  24 +--
 pkg/events/cluster_account_subscribe.go            |  63 ++++++++
 pkg/events/external_identity_subscribe.go          |  17 ++
 pkg/events/user_dn_subscribe.go                    |  17 ++
 pkg/models/allocation.go                           |  11 ++
 pkg/models/project.go                              |  48 ++++--
 pkg/service/cluster_account.go                     | 176 +++++++++++++++++++++
 pkg/service/project.go                             |  36 +++++
 pkg/service/service.go                             |   8 +
 pkg/service/user.go                                |  24 +++
 pkg/service/user_merge.go                          | 100 ++++++++++++
 27 files changed, 1142 insertions(+), 36 deletions(-)

diff --git a/internal/db/migrations/000011_projects_status.down.sql 
b/internal/db/migrations/000011_projects_status.down.sql
new file mode 100644
index 000000000..136999f6f
--- /dev/null
+++ b/internal/db/migrations/000011_projects_status.down.sql
@@ -0,0 +1,20 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+ALTER TABLE projects
+    DROP KEY idx_projects_status,
+    DROP COLUMN status;
diff --git a/internal/db/migrations/000011_projects_status.up.sql 
b/internal/db/migrations/000011_projects_status.up.sql
new file mode 100644
index 000000000..1de30ed61
--- /dev/null
+++ b/internal/db/migrations/000011_projects_status.up.sql
@@ -0,0 +1,20 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+ALTER TABLE projects
+    ADD COLUMN status VARCHAR(32) NOT NULL DEFAULT 'ACTIVE' AFTER 
project_pi_id,
+    ADD KEY idx_projects_status (status);
diff --git a/internal/db/migrations/000012_cluster_accounts.down.sql 
b/internal/db/migrations/000012_cluster_accounts.down.sql
new file mode 100644
index 000000000..f3ca1be95
--- /dev/null
+++ b/internal/db/migrations/000012_cluster_accounts.down.sql
@@ -0,0 +1,18 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+DROP TABLE IF EXISTS cluster_accounts;
diff --git a/internal/db/migrations/000012_cluster_accounts.up.sql 
b/internal/db/migrations/000012_cluster_accounts.up.sql
new file mode 100644
index 000000000..3eab32279
--- /dev/null
+++ b/internal/db/migrations/000012_cluster_accounts.up.sql
@@ -0,0 +1,34 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+CREATE TABLE IF NOT EXISTS cluster_accounts
+(
+    id                 VARCHAR(255) NOT NULL,
+    user_id            VARCHAR(255) NOT NULL,
+    compute_cluster_id VARCHAR(255) NOT NULL,
+    username           VARCHAR(255) NOT NULL,
+    status             VARCHAR(32)  NOT NULL DEFAULT 'ACTIVE',
+    created_at         TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+    updated_at         TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON 
UPDATE CURRENT_TIMESTAMP(6),
+    PRIMARY KEY (id),
+    UNIQUE KEY uq_cluster_accounts_cluster_username (compute_cluster_id, 
username),
+    KEY idx_cluster_accounts_user (user_id),
+    KEY idx_cluster_accounts_cluster (compute_cluster_id),
+    KEY idx_cluster_accounts_status (status),
+    CONSTRAINT fk_cluster_accounts_user FOREIGN KEY (user_id) REFERENCES users 
(id) ON DELETE CASCADE,
+    CONSTRAINT fk_cluster_accounts_cluster FOREIGN KEY (compute_cluster_id) 
REFERENCES compute_clusters (id) ON DELETE CASCADE
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
diff --git a/internal/db/migrations/000013_users_status.down.sql 
b/internal/db/migrations/000013_users_status.down.sql
new file mode 100644
index 000000000..8c5d8445d
--- /dev/null
+++ b/internal/db/migrations/000013_users_status.down.sql
@@ -0,0 +1,20 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+ALTER TABLE users
+    DROP KEY idx_users_status,
+    DROP COLUMN status;
diff --git a/internal/db/migrations/000013_users_status.up.sql 
b/internal/db/migrations/000013_users_status.up.sql
new file mode 100644
index 000000000..5b88416a6
--- /dev/null
+++ b/internal/db/migrations/000013_users_status.up.sql
@@ -0,0 +1,20 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+ALTER TABLE users
+    ADD COLUMN status VARCHAR(32) NOT NULL DEFAULT 'ACTIVE' AFTER email,
+    ADD KEY idx_users_status (status);
diff --git a/internal/db/migrations/000014_user_merges.down.sql 
b/internal/db/migrations/000014_user_merges.down.sql
new file mode 100644
index 000000000..bc65c15a1
--- /dev/null
+++ b/internal/db/migrations/000014_user_merges.down.sql
@@ -0,0 +1,18 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+DROP TABLE IF EXISTS user_merges;
diff --git a/internal/db/migrations/000014_user_merges.up.sql 
b/internal/db/migrations/000014_user_merges.up.sql
new file mode 100644
index 000000000..7aeeb8844
--- /dev/null
+++ b/internal/db/migrations/000014_user_merges.up.sql
@@ -0,0 +1,30 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+CREATE TABLE IF NOT EXISTS user_merges
+(
+    id                BIGINT       NOT NULL AUTO_INCREMENT,
+    retiring_user_id  VARCHAR(255) NOT NULL,
+    surviving_user_id VARCHAR(255) NOT NULL,
+    reason            TEXT         NULL,
+    merged_at         TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+    PRIMARY KEY (id),
+    UNIQUE KEY uq_user_merges_retiring (retiring_user_id),
+    KEY idx_user_merges_surviving (surviving_user_id),
+    CONSTRAINT fk_user_merges_retiring  FOREIGN KEY (retiring_user_id)  
REFERENCES users (id) ON DELETE RESTRICT,
+    CONSTRAINT fk_user_merges_surviving FOREIGN KEY (surviving_user_id) 
REFERENCES users (id) ON DELETE RESTRICT
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
diff --git a/internal/server/server.go b/internal/server/server.go
index 56b6a6bd7..09f33d22b 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -56,9 +56,12 @@ func (s *Server) routes() {
 
        s.mux.HandleFunc("POST /users", s.createUser)
        s.mux.HandleFunc("GET /users/{id}", s.getUser)
+       s.mux.HandleFunc("GET /users/external-identity/{source}/{externalId}", 
s.getUserByExternalIdentity)
+       s.mux.HandleFunc("POST /users/merge", s.mergeUsers)
 
        s.mux.HandleFunc("POST /projects", s.createProject)
        s.mux.HandleFunc("GET /projects/{id}", s.getProject)
+       s.mux.HandleFunc("PUT /projects/{id}/status", s.updateProjectStatus)
 
        s.mux.HandleFunc("POST /compute-clusters", s.createComputeCluster)
        s.mux.HandleFunc("GET /compute-clusters", s.listComputeClusters)
@@ -117,19 +120,27 @@ func (s *Server) routes() {
        s.mux.HandleFunc("GET 
/compute-allocations/{id}/users/{userId}/usages/total", 
s.getTotalSUUsageForUserInAllocation)
        s.mux.HandleFunc("GET /users/{id}/compute-allocation-usages", 
s.listUsagesByUser)
 
+       s.mux.HandleFunc("POST /cluster-accounts", s.createClusterAccount)
+       s.mux.HandleFunc("GET /cluster-accounts/{id}", s.getClusterAccount)
+       s.mux.HandleFunc("PUT /cluster-accounts/{id}/status", 
s.updateClusterAccountStatus)
+       s.mux.HandleFunc("DELETE /cluster-accounts/{id}", 
s.deleteClusterAccount)
+       s.mux.HandleFunc("GET /compute-clusters/{id}/accounts", 
s.listClusterAccountsForCluster)
+       s.mux.HandleFunc("GET /compute-clusters/{id}/accounts/{username}", 
s.getClusterAccountByClusterAndUsername)
+       s.mux.HandleFunc("GET /users/{id}/cluster-accounts", 
s.listClusterAccountsForUser)
+
        s.mux.HandleFunc("POST /external-identities", s.createExternalIdentity)
        s.mux.HandleFunc("GET /external-identities/{id}", s.getExternalIdentity)
        s.mux.HandleFunc("PUT /external-identities/{id}", 
s.updateExternalIdentity)
        s.mux.HandleFunc("DELETE /external-identities/{id}", 
s.deleteExternalIdentity)
-       s.mux.HandleFunc("GET 
/external-identities/by-source/{source}/{externalId}", 
s.getExternalIdentityBySource)
-       s.mux.HandleFunc("GET /external-identities/by-oidc-sub/{sub}", 
s.getExternalIdentityByOIDCSub)
+       s.mux.HandleFunc("GET 
/external-identities/source/{source}/{externalId}", 
s.getExternalIdentityBySource)
+       s.mux.HandleFunc("GET /external-identities/oidc-sub/{sub}", 
s.getExternalIdentityByOIDCSub)
        s.mux.HandleFunc("GET /users/{id}/external-identities", 
s.listExternalIdentitiesForUser)
 
        s.mux.HandleFunc("POST /users/{id}/dns", s.addUserDN)
        s.mux.HandleFunc("GET /user-dns/{id}", s.getUserDN)
        s.mux.HandleFunc("DELETE /user-dns/{id}", s.removeUserDN)
        s.mux.HandleFunc("GET /users/{id}/dns", s.listUserDNs)
-       s.mux.HandleFunc("GET /user-dns/by-dn", s.getUserDNByDN)
+       s.mux.HandleFunc("GET /user-dns", s.getUserDNByDN)
 }
 
 func (s *Server) healthz(w http.ResponseWriter, _ *http.Request) {
@@ -182,6 +193,33 @@ func (s *Server) getUser(w http.ResponseWriter, r 
*http.Request) {
        writeJSON(w, http.StatusOK, u)
 }
 
+func (s *Server) getUserByExternalIdentity(w http.ResponseWriter, r 
*http.Request) {
+       u, err := s.svc.GetUserByExternalIdentity(r.Context(), 
r.PathValue("source"), r.PathValue("externalId"))
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, u)
+}
+
+func (s *Server) mergeUsers(w http.ResponseWriter, r *http.Request) {
+       var body struct {
+               SurvivingUserID string `json:"surviving_user_id"`
+               RetiringUserID  string `json:"retiring_user_id"`
+               Reason          string `json:"reason,omitempty"`
+       }
+       if err := decodeJSON(r, &body); err != nil {
+               writeError(w, http.StatusBadRequest, err)
+               return
+       }
+       survivor, err := s.svc.MergeUsers(r.Context(), body.SurvivingUserID, 
body.RetiringUserID, body.Reason)
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, survivor)
+}
+
 func (s *Server) createProject(w http.ResponseWriter, r *http.Request) {
        var p models.Project
        if err := decodeJSON(r, &p); err != nil {
@@ -720,6 +758,96 @@ func (s *Server) getTotalSUUsageForUserInAllocation(w 
http.ResponseWriter, r *ht
        })
 }
 
+func (s *Server) updateProjectStatus(w http.ResponseWriter, r *http.Request) {
+       var body struct {
+               Status models.AllocationStatus `json:"status"`
+       }
+       if err := decodeJSON(r, &body); err != nil {
+               writeError(w, http.StatusBadRequest, err)
+               return
+       }
+       updated, err := s.svc.UpdateProjectStatus(r.Context(), 
r.PathValue("id"), body.Status)
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, updated)
+}
+
+func (s *Server) createClusterAccount(w http.ResponseWriter, r *http.Request) {
+       var a models.ClusterAccount
+       if err := decodeJSON(r, &a); err != nil {
+               writeError(w, http.StatusBadRequest, err)
+               return
+       }
+       created, err := s.svc.CreateClusterAccount(r.Context(), &a)
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusCreated, created)
+}
+
+func (s *Server) getClusterAccount(w http.ResponseWriter, r *http.Request) {
+       a, err := s.svc.GetClusterAccount(r.Context(), r.PathValue("id"))
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, a)
+}
+
+func (s *Server) updateClusterAccountStatus(w http.ResponseWriter, r 
*http.Request) {
+       var body struct {
+               Status models.AllocationStatus `json:"status"`
+       }
+       if err := decodeJSON(r, &body); err != nil {
+               writeError(w, http.StatusBadRequest, err)
+               return
+       }
+       updated, err := s.svc.UpdateClusterAccountStatus(r.Context(), 
r.PathValue("id"), body.Status)
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, updated)
+}
+
+func (s *Server) deleteClusterAccount(w http.ResponseWriter, r *http.Request) {
+       if err := s.svc.DeleteClusterAccount(r.Context(), r.PathValue("id")); 
err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       w.WriteHeader(http.StatusNoContent)
+}
+
+func (s *Server) listClusterAccountsForCluster(w http.ResponseWriter, r 
*http.Request) {
+       out, err := s.svc.ListClusterAccountsForCluster(r.Context(), 
r.PathValue("id"))
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, out)
+}
+
+func (s *Server) getClusterAccountByClusterAndUsername(w http.ResponseWriter, 
r *http.Request) {
+       a, err := s.svc.GetClusterAccountByClusterAndUsername(r.Context(), 
r.PathValue("id"), r.PathValue("username"))
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, a)
+}
+
+func (s *Server) listClusterAccountsForUser(w http.ResponseWriter, r 
*http.Request) {
+       out, err := s.svc.ListClusterAccountsForUser(r.Context(), 
r.PathValue("id"))
+       if err != nil {
+               writeServiceError(w, err)
+               return
+       }
+       writeJSON(w, http.StatusOK, out)
+}
+
 func (s *Server) createExternalIdentity(w http.ResponseWriter, r 
*http.Request) {
        var e models.ExternalIdentity
        if err := decodeJSON(r, &e); err != nil {
@@ -762,7 +890,7 @@ func (s *Server) deleteExternalIdentity(w 
http.ResponseWriter, r *http.Request)
                writeServiceError(w, err)
                return
        }
-       writeJSON(w, http.StatusNoContent, nil)
+       w.WriteHeader(http.StatusNoContent)
 }
 
 func (s *Server) getExternalIdentityBySource(w http.ResponseWriter, r 
*http.Request) {
@@ -821,7 +949,7 @@ func (s *Server) removeUserDN(w http.ResponseWriter, r 
*http.Request) {
                writeServiceError(w, err)
                return
        }
-       writeJSON(w, http.StatusNoContent, nil)
+       w.WriteHeader(http.StatusNoContent)
 }
 
 func (s *Server) listUserDNs(w http.ResponseWriter, r *http.Request) {
diff --git a/internal/store/cluster_account_store.go 
b/internal/store/cluster_account_store.go
new file mode 100644
index 000000000..06012421d
--- /dev/null
+++ b/internal/store/cluster_account_store.go
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+
+       "github.com/jmoiron/sqlx"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+type mysqlClusterAccountStore struct {
+       db *sqlx.DB
+}
+
+// NewClusterAccountStore returns a MySQL-backed ClusterAccountStore.
+func NewClusterAccountStore(db *sqlx.DB) ClusterAccountStore {
+       return &mysqlClusterAccountStore{db: db}
+}
+
+const clusterAccountColumns = `id, user_id, compute_cluster_id, username, 
status`
+
+func (s *mysqlClusterAccountStore) FindByID(ctx context.Context, id string) 
(*models.ClusterAccount, error) {
+       var a models.ClusterAccount
+       err := s.db.GetContext(ctx, &a,
+               `SELECT `+clusterAccountColumns+` FROM cluster_accounts WHERE 
id = ?`, id)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &a, nil
+}
+
+func (s *mysqlClusterAccountStore) FindByClusterAndUsername(ctx 
context.Context, clusterID, username string) (*models.ClusterAccount, error) {
+       var a models.ClusterAccount
+       err := s.db.GetContext(ctx, &a,
+               `SELECT `+clusterAccountColumns+` FROM cluster_accounts WHERE 
compute_cluster_id = ? AND username = ?`,
+               clusterID, username)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &a, nil
+}
+
+func (s *mysqlClusterAccountStore) FindByUser(ctx context.Context, userID 
string) ([]models.ClusterAccount, error) {
+       var out []models.ClusterAccount
+       err := s.db.SelectContext(ctx, &out,
+               `SELECT `+clusterAccountColumns+` FROM cluster_accounts WHERE 
user_id = ? ORDER BY created_at ASC`,
+               userID)
+       if err != nil {
+               return nil, err
+       }
+       return out, nil
+}
+
+func (s *mysqlClusterAccountStore) FindByCluster(ctx context.Context, 
clusterID string) ([]models.ClusterAccount, error) {
+       var out []models.ClusterAccount
+       err := s.db.SelectContext(ctx, &out,
+               `SELECT `+clusterAccountColumns+` FROM cluster_accounts WHERE 
compute_cluster_id = ? ORDER BY created_at ASC`,
+               clusterID)
+       if err != nil {
+               return nil, err
+       }
+       return out, nil
+}
+
+func (s *mysqlClusterAccountStore) Create(ctx context.Context, tx *sql.Tx, a 
*models.ClusterAccount) error {
+       _, err := tx.ExecContext(ctx,
+               `INSERT INTO cluster_accounts (id, user_id, compute_cluster_id, 
username, status)
+                VALUES (?, ?, ?, ?, ?)`,
+               a.ID, a.UserID, a.ComputeClusterID, a.Username, a.Status)
+       return err
+}
+
+func (s *mysqlClusterAccountStore) UpdateStatus(ctx context.Context, tx 
*sql.Tx, id string, status models.AllocationStatus) error {
+       _, err := tx.ExecContext(ctx,
+               `UPDATE cluster_accounts SET status = ? WHERE id = ?`,
+               status, id)
+       return err
+}
+
+func (s *mysqlClusterAccountStore) ReassignUser(ctx context.Context, tx 
*sql.Tx, fromUserID, toUserID string) error {
+       // Drop fromUserID's accounts whose (cluster, username) the survivor 
already
+       // owns, then move the rest.
+       if _, err := tx.ExecContext(ctx,
+               `DELETE FROM cluster_accounts
+                WHERE user_id = ?
+                  AND (compute_cluster_id, username) IN (
+                      SELECT compute_cluster_id, username FROM (
+                          SELECT compute_cluster_id, username FROM 
cluster_accounts WHERE user_id = ?
+                      ) AS s
+                  )`,
+               fromUserID, toUserID); err != nil {
+               return err
+       }
+       _, err := tx.ExecContext(ctx,
+               `UPDATE cluster_accounts SET user_id = ? WHERE user_id = ?`,
+               toUserID, fromUserID)
+       return err
+}
+
+func (s *mysqlClusterAccountStore) Delete(ctx context.Context, tx *sql.Tx, id 
string) error {
+       _, err := tx.ExecContext(ctx, `DELETE FROM cluster_accounts WHERE id = 
?`, id)
+       return err
+}
diff --git a/internal/store/compute_allocation_membership_store.go 
b/internal/store/compute_allocation_membership_store.go
index aae350ac9..21010108c 100644
--- a/internal/store/compute_allocation_membership_store.go
+++ b/internal/store/compute_allocation_membership_store.go
@@ -116,6 +116,26 @@ func (s *mysqlComputeAllocationMembershipStore) Update(ctx 
context.Context, tx *
        return err
 }
 
+func (s *mysqlComputeAllocationMembershipStore) ReassignUser(ctx 
context.Context, tx *sql.Tx, fromUserID, toUserID string) error {
+       // Drop fromUserID's memberships in allocations the survivor already 
belongs
+       // to, then move the rest.
+       if _, err := tx.ExecContext(ctx,
+               `DELETE FROM compute_allocation_memberships
+                WHERE user_id = ?
+                  AND compute_allocation_id IN (
+                      SELECT compute_allocation_id FROM (
+                          SELECT compute_allocation_id FROM 
compute_allocation_memberships WHERE user_id = ?
+                      ) AS s
+                  )`,
+               fromUserID, toUserID); err != nil {
+               return err
+       }
+       _, err := tx.ExecContext(ctx,
+               `UPDATE compute_allocation_memberships SET user_id = ? WHERE 
user_id = ?`,
+               toUserID, fromUserID)
+       return err
+}
+
 func (s *mysqlComputeAllocationMembershipStore) Delete(ctx context.Context, tx 
*sql.Tx, id string) error {
        _, err := tx.ExecContext(ctx, `DELETE FROM 
compute_allocation_memberships WHERE id = ?`, id)
        return err
diff --git a/internal/store/external_identity_store.go 
b/internal/store/external_identity_store.go
index 41f9e099c..2b3bac182 100644
--- a/internal/store/external_identity_store.go
+++ b/internal/store/external_identity_store.go
@@ -106,6 +106,13 @@ func (s *mysqlExternalIdentityStore) Update(ctx 
context.Context, tx *sql.Tx, e *
        return err
 }
 
+func (s *mysqlExternalIdentityStore) ReassignUser(ctx context.Context, tx 
*sql.Tx, fromUserID, toUserID string) error {
+       _, err := tx.ExecContext(ctx,
+               `UPDATE external_identities SET user_id = ? WHERE user_id = ?`,
+               toUserID, fromUserID)
+       return err
+}
+
 func (s *mysqlExternalIdentityStore) Delete(ctx context.Context, tx *sql.Tx, 
id string) error {
        _, err := tx.ExecContext(ctx, `DELETE FROM external_identities WHERE id 
= ?`, id)
        return err
diff --git a/internal/store/project_store.go b/internal/store/project_store.go
index fc447500a..226836f35 100644
--- a/internal/store/project_store.go
+++ b/internal/store/project_store.go
@@ -39,7 +39,7 @@ func NewProjectStore(db *sqlx.DB) ProjectStore {
 func (s *mysqlProjectStore) FindByID(ctx context.Context, id string) 
(*models.Project, error) {
        var p models.Project
        err := s.db.GetContext(ctx, &p,
-               `SELECT id, originated_id, title, origination, project_pi_id, 
created_time
+               `SELECT id, originated_id, title, origination, project_pi_id, 
status, created_time
                 FROM projects WHERE id = ?`, id)
        if err != nil {
                if errors.Is(err, sql.ErrNoRows) {
@@ -53,7 +53,7 @@ func (s *mysqlProjectStore) FindByID(ctx context.Context, id 
string) (*models.Pr
 func (s *mysqlProjectStore) FindByOriginatedID(ctx context.Context, 
originatedID string) (*models.Project, error) {
        var p models.Project
        err := s.db.GetContext(ctx, &p,
-               `SELECT id, originated_id, title, origination, project_pi_id, 
created_time
+               `SELECT id, originated_id, title, origination, project_pi_id, 
status, created_time
                 FROM projects WHERE originated_id = ?`, originatedID)
        if err != nil {
                if errors.Is(err, sql.ErrNoRows) {
@@ -67,7 +67,7 @@ func (s *mysqlProjectStore) FindByOriginatedID(ctx 
context.Context, originatedID
 func (s *mysqlProjectStore) FindByPI(ctx context.Context, piUserID string) 
([]models.Project, error) {
        var projects []models.Project
        err := s.db.SelectContext(ctx, &projects,
-               `SELECT id, originated_id, title, origination, project_pi_id, 
created_time
+               `SELECT id, originated_id, title, origination, project_pi_id, 
status, created_time
                 FROM projects WHERE project_pi_id = ?`, piUserID)
        if err != nil {
                return nil, err
@@ -77,17 +77,31 @@ func (s *mysqlProjectStore) FindByPI(ctx context.Context, 
piUserID string) ([]mo
 
 func (s *mysqlProjectStore) Create(ctx context.Context, tx *sql.Tx, p 
*models.Project) error {
        _, err := tx.ExecContext(ctx,
-               `INSERT INTO projects (id, originated_id, title, origination, 
project_pi_id, created_time)
-                VALUES (?, ?, ?, ?, ?, ?)`,
-               p.ID, p.OriginatedID, p.Title, p.Origination, p.ProjectPIID, 
p.CreatedTime)
+               `INSERT INTO projects (id, originated_id, title, origination, 
project_pi_id, status, created_time)
+                VALUES (?, ?, ?, ?, ?, ?, ?)`,
+               p.ID, p.OriginatedID, p.Title, p.Origination, p.ProjectPIID, 
p.Status, p.CreatedTime)
        return err
 }
 
 func (s *mysqlProjectStore) Update(ctx context.Context, tx *sql.Tx, p 
*models.Project) error {
        _, err := tx.ExecContext(ctx,
-               `UPDATE projects SET originated_id = ?, title = ?, origination 
= ?, project_pi_id = ?
+               `UPDATE projects SET originated_id = ?, title = ?, origination 
= ?, project_pi_id = ?, status = ?
                 WHERE id = ?`,
-               p.OriginatedID, p.Title, p.Origination, p.ProjectPIID, p.ID)
+               p.OriginatedID, p.Title, p.Origination, p.ProjectPIID, 
p.Status, p.ID)
+       return err
+}
+
+func (s *mysqlProjectStore) UpdateStatus(ctx context.Context, tx *sql.Tx, id 
string, status models.AllocationStatus) error {
+       _, err := tx.ExecContext(ctx,
+               `UPDATE projects SET status = ? WHERE id = ?`,
+               status, id)
+       return err
+}
+
+func (s *mysqlProjectStore) ReassignPI(ctx context.Context, tx *sql.Tx, 
fromUserID, toUserID string) error {
+       _, err := tx.ExecContext(ctx,
+               `UPDATE projects SET project_pi_id = ? WHERE project_pi_id = ?`,
+               toUserID, fromUserID)
        return err
 }
 
diff --git a/internal/store/store.go b/internal/store/store.go
index 2ba3caced..355788a99 100644
--- a/internal/store/store.go
+++ b/internal/store/store.go
@@ -35,12 +35,31 @@ type UserStore interface {
        FindByOrganization(ctx context.Context, organizationID string) 
([]models.User, error)
        // Create inserts a new user within the provided transaction.
        Create(ctx context.Context, tx *sql.Tx, u *models.User) error
-       // Update replaces mutable fields of an existing user within the 
provided transaction.
+       // Update replaces mutable fields of an existing user within the 
provided
+       // transaction. Does NOT touch status — route status changes through
+       // UpdateStatus.
        Update(ctx context.Context, tx *sql.Tx, u *models.User) error
-       // Delete removes a user by ID within the provided transaction.
+       // UpdateStatus flips only the lifecycle status (ACTIVE / INACTIVE / 
MERGED).
+       UpdateStatus(ctx context.Context, tx *sql.Tx, id string, status 
models.UserStatus) error
+       // Delete removes a user by ID within the provided transaction. This is 
the
+       // explicit hard-delete. The user-merge flow uses UpdateStatus(MERGED) 
and a
+       // row in user_merges instead.
        Delete(ctx context.Context, tx *sql.Tx, id string) error
 }
 
+// UserMergeStore defines persistence operations for the user_merges audit
+// table, which records the linkage produced by Service.MergeUsers.
+type UserMergeStore interface {
+       // Record inserts a merge linkage. Reason may be empty.
+       Record(ctx context.Context, tx *sql.Tx, retiringUserID, 
survivingUserID, reason string) error
+       // FindByRetiringUser returns the single merge row whose retiring user
+       // matches, or nil if absent.
+       FindByRetiringUser(ctx context.Context, retiringUserID string) 
(*models.UserMerge, error)
+       // FindBySurvivingUser returns every merge row whose surviving user 
matches,
+       // ordered by merged_at ascending.
+       FindBySurvivingUser(ctx context.Context, survivingUserID string) 
([]models.UserMerge, error)
+}
+
 // OrganizationStore defines persistence operations for organizations.
 type OrganizationStore interface {
        // FindByID returns the organization with the given ID, or nil if not 
found.
@@ -83,6 +102,11 @@ type ProjectStore interface {
        Create(ctx context.Context, tx *sql.Tx, p *models.Project) error
        // Update replaces mutable fields of an existing project within the 
provided transaction.
        Update(ctx context.Context, tx *sql.Tx, p *models.Project) error
+       // UpdateStatus updates only the status field of an existing project.
+       UpdateStatus(ctx context.Context, tx *sql.Tx, id string, status 
models.AllocationStatus) error
+       // ReassignPI re-points every project's project_pi_id from fromUserID to
+       // toUserID.
+       ReassignPI(ctx context.Context, tx *sql.Tx, fromUserID, toUserID 
string) error
        // Delete removes a project by ID within the provided transaction.
        Delete(ctx context.Context, tx *sql.Tx, id string) error
 }
@@ -227,10 +251,36 @@ type ComputeAllocationMembershipStore interface {
        Create(ctx context.Context, tx *sql.Tx, m 
*models.ComputeAllocationMembership) error
        // Update replaces mutable fields of an existing membership within the 
provided transaction.
        Update(ctx context.Context, tx *sql.Tx, m 
*models.ComputeAllocationMembership) error
+       // ReassignUser re-points every membership's user_id from fromUserID to
+       // toUserID.
+       ReassignUser(ctx context.Context, tx *sql.Tx, fromUserID, toUserID 
string) error
        // Delete removes a membership by ID within the provided transaction.
        Delete(ctx context.Context, tx *sql.Tx, id string) error
 }
 
+// ClusterAccountStore defines persistence operations for posix-style
+// accounts provisioned for a user on a specific compute cluster.
+// (compute_cluster_id, username) is unique.
+type ClusterAccountStore interface {
+       // FindByID returns the cluster account with the given ID, or nil if 
absent.
+       FindByID(ctx context.Context, id string) (*models.ClusterAccount, error)
+       // FindByClusterAndUsername returns the account for a (cluster, 
username)
+       // pair, or nil if absent.
+       FindByClusterAndUsername(ctx context.Context, clusterID, username 
string) (*models.ClusterAccount, error)
+       // FindByUser returns every cluster account belonging to the given user.
+       FindByUser(ctx context.Context, userID string) 
([]models.ClusterAccount, error)
+       // FindByCluster returns every cluster account on the given cluster.
+       FindByCluster(ctx context.Context, clusterID string) 
([]models.ClusterAccount, error)
+       // Create inserts a new cluster account within the provided transaction.
+       Create(ctx context.Context, tx *sql.Tx, a *models.ClusterAccount) error
+       // UpdateStatus updates only the status field.
+       UpdateStatus(ctx context.Context, tx *sql.Tx, id string, status 
models.AllocationStatus) error
+       // ReassignUser re-points every account from fromUserID to toUserID.
+       ReassignUser(ctx context.Context, tx *sql.Tx, fromUserID, toUserID 
string) error
+       // Delete removes a cluster account by ID within the provided 
transaction.
+       Delete(ctx context.Context, tx *sql.Tx, id string) error
+}
+
 // ExternalIdentityStore defines persistence operations for the mapping
 // between users and their identifiers in external systems.
 // UNIQUE (source, external_id) is enforced at the schema level.
@@ -251,6 +301,9 @@ type ExternalIdentityStore interface {
        // Update replaces mutable fields of an existing external identity 
within
        // the provided transaction.
        Update(ctx context.Context, tx *sql.Tx, e *models.ExternalIdentity) 
error
+       // ReassignUser re-points every external identity from fromUserID to
+       // toUserID.
+       ReassignUser(ctx context.Context, tx *sql.Tx, fromUserID, toUserID 
string) error
        // Delete removes an external identity by ID within the provided 
transaction.
        Delete(ctx context.Context, tx *sql.Tx, id string) error
 }
@@ -268,6 +321,8 @@ type UserDNStore interface {
        FindByUser(ctx context.Context, userID string) ([]models.UserDN, error)
        // Create inserts a new DN binding within the provided transaction.
        Create(ctx context.Context, tx *sql.Tx, d *models.UserDN) error
+       // ReassignUser re-points every DN binding from fromUserID to toUserID.
+       ReassignUser(ctx context.Context, tx *sql.Tx, fromUserID, toUserID 
string) error
        // Delete removes a DN binding by ID within the provided transaction.
        Delete(ctx context.Context, tx *sql.Tx, id string) error
 }
diff --git a/internal/store/user_dn_store.go b/internal/store/user_dn_store.go
index 656413572..bb0a73741 100644
--- a/internal/store/user_dn_store.go
+++ b/internal/store/user_dn_store.go
@@ -82,6 +82,21 @@ func (s *mysqlUserDNStore) Create(ctx context.Context, tx 
*sql.Tx, d *models.Use
        return err
 }
 
+func (s *mysqlUserDNStore) ReassignUser(ctx context.Context, tx *sql.Tx, 
fromUserID, toUserID string) error {
+       // Drop fromUserID's rows whose dn the survivor already holds, then 
move the rest.
+       if _, err := tx.ExecContext(ctx,
+               `DELETE FROM user_dns
+                WHERE user_id = ?
+                  AND dn IN (SELECT dn FROM (SELECT dn FROM user_dns WHERE 
user_id = ?) AS s)`,
+               fromUserID, toUserID); err != nil {
+               return err
+       }
+       _, err := tx.ExecContext(ctx,
+               `UPDATE user_dns SET user_id = ? WHERE user_id = ?`,
+               toUserID, fromUserID)
+       return err
+}
+
 func (s *mysqlUserDNStore) Delete(ctx context.Context, tx *sql.Tx, id string) 
error {
        _, err := tx.ExecContext(ctx, `DELETE FROM user_dns WHERE id = ?`, id)
        return err
diff --git a/internal/store/user_merge_store.go 
b/internal/store/user_merge_store.go
new file mode 100644
index 000000000..827462e55
--- /dev/null
+++ b/internal/store/user_merge_store.go
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+
+       "github.com/jmoiron/sqlx"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+type mysqlUserMergeStore struct {
+       db *sqlx.DB
+}
+
+// NewUserMergeStore returns a MySQL-backed UserMergeStore.
+func NewUserMergeStore(db *sqlx.DB) UserMergeStore {
+       return &mysqlUserMergeStore{db: db}
+}
+
+const userMergeColumns = `id, retiring_user_id, surviving_user_id, 
COALESCE(reason, '') AS reason, merged_at`
+
+func (s *mysqlUserMergeStore) Record(ctx context.Context, tx *sql.Tx, 
retiringUserID, survivingUserID, reason string) error {
+       var reasonArg any
+       if reason == "" {
+               reasonArg = nil
+       } else {
+               reasonArg = reason
+       }
+       _, err := tx.ExecContext(ctx,
+               `INSERT INTO user_merges (retiring_user_id, surviving_user_id, 
reason)
+                VALUES (?, ?, ?)`,
+               retiringUserID, survivingUserID, reasonArg)
+       return err
+}
+
+func (s *mysqlUserMergeStore) FindByRetiringUser(ctx context.Context, 
retiringUserID string) (*models.UserMerge, error) {
+       var m models.UserMerge
+       err := s.db.GetContext(ctx, &m,
+               `SELECT `+userMergeColumns+` FROM user_merges WHERE 
retiring_user_id = ?`, retiringUserID)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &m, nil
+}
+
+func (s *mysqlUserMergeStore) FindBySurvivingUser(ctx context.Context, 
survivingUserID string) ([]models.UserMerge, error) {
+       var out []models.UserMerge
+       err := s.db.SelectContext(ctx, &out,
+               `SELECT `+userMergeColumns+` FROM user_merges WHERE 
surviving_user_id = ? ORDER BY merged_at ASC`,
+               survivingUserID)
+       if err != nil {
+               return nil, err
+       }
+       return out, nil
+}
diff --git a/internal/store/user_store.go b/internal/store/user_store.go
index eda2b749f..30659283b 100644
--- a/internal/store/user_store.go
+++ b/internal/store/user_store.go
@@ -27,6 +27,8 @@ import (
        "github.com/apache/airavata-custos/pkg/models"
 )
 
+const userColumns = `id, organization_id, first_name, last_name, middle_name, 
email, status`
+
 type mysqlUserStore struct {
        db *sqlx.DB
 }
@@ -39,8 +41,7 @@ func NewUserStore(db *sqlx.DB) UserStore {
 func (s *mysqlUserStore) FindByID(ctx context.Context, id string) 
(*models.User, error) {
        var u models.User
        err := s.db.GetContext(ctx, &u,
-               `SELECT id, organization_id, first_name, last_name, 
middle_name, email
-                FROM users WHERE id = ?`, id)
+               `SELECT `+userColumns+` FROM users WHERE id = ?`, id)
        if err != nil {
                if errors.Is(err, sql.ErrNoRows) {
                        return nil, nil
@@ -53,8 +54,7 @@ func (s *mysqlUserStore) FindByID(ctx context.Context, id 
string) (*models.User,
 func (s *mysqlUserStore) FindByEmail(ctx context.Context, email string) 
(*models.User, error) {
        var u models.User
        err := s.db.GetContext(ctx, &u,
-               `SELECT id, organization_id, first_name, last_name, 
middle_name, email
-                FROM users WHERE email = ?`, email)
+               `SELECT `+userColumns+` FROM users WHERE email = ?`, email)
        if err != nil {
                if errors.Is(err, sql.ErrNoRows) {
                        return nil, nil
@@ -67,8 +67,7 @@ func (s *mysqlUserStore) FindByEmail(ctx context.Context, 
email string) (*models
 func (s *mysqlUserStore) FindByOrganization(ctx context.Context, 
organizationID string) ([]models.User, error) {
        var users []models.User
        err := s.db.SelectContext(ctx, &users,
-               `SELECT id, organization_id, first_name, last_name, 
middle_name, email
-                FROM users WHERE organization_id = ?`, organizationID)
+               `SELECT `+userColumns+` FROM users WHERE organization_id = ?`, 
organizationID)
        if err != nil {
                return nil, err
        }
@@ -77,9 +76,9 @@ func (s *mysqlUserStore) FindByOrganization(ctx 
context.Context, organizationID
 
 func (s *mysqlUserStore) Create(ctx context.Context, tx *sql.Tx, u 
*models.User) error {
        _, err := tx.ExecContext(ctx,
-               `INSERT INTO users (id, organization_id, first_name, last_name, 
middle_name, email)
-                VALUES (?, ?, ?, ?, ?, ?)`,
-               u.ID, u.OrganizationID, u.FirstName, u.LastName, u.MiddleName, 
u.Email)
+               `INSERT INTO users (id, organization_id, first_name, last_name, 
middle_name, email, status)
+                VALUES (?, ?, ?, ?, ?, ?, ?)`,
+               u.ID, u.OrganizationID, u.FirstName, u.LastName, u.MiddleName, 
u.Email, u.Status)
        return err
 }
 
@@ -91,6 +90,13 @@ func (s *mysqlUserStore) Update(ctx context.Context, tx 
*sql.Tx, u *models.User)
        return err
 }
 
+func (s *mysqlUserStore) UpdateStatus(ctx context.Context, tx *sql.Tx, id 
string, status models.UserStatus) error {
+       _, err := tx.ExecContext(ctx,
+               `UPDATE users SET status = ? WHERE id = ?`,
+               status, id)
+       return err
+}
+
 func (s *mysqlUserStore) Delete(ctx context.Context, tx *sql.Tx, id string) 
error {
        _, err := tx.ExecContext(ctx, `DELETE FROM users WHERE id = ?`, id)
        return err
diff --git a/pkg/events/cluster_account_subscribe.go 
b/pkg/events/cluster_account_subscribe.go
new file mode 100644
index 000000000..cdb26f97e
--- /dev/null
+++ b/pkg/events/cluster_account_subscribe.go
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package events
+
+import (
+       "log/slog"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// ClusterAccountHandler handles cluster account lifecycle events with a typed 
payload.
+type ClusterAccountHandler func(a models.ClusterAccount)
+
+// SubscribeClusterAccountCreated registers a typed handler invoked whenever a
+// cluster_account::create event is published.
+func (b *Bus) SubscribeClusterAccountCreated(handler ClusterAccountHandler) {
+       b.subscribeClusterAccount(ClusterAccountCreateEvent, handler)
+}
+
+// SubscribeClusterAccountUpdated registers a typed handler invoked whenever a
+// cluster_account::update event is published.
+func (b *Bus) SubscribeClusterAccountUpdated(handler ClusterAccountHandler) {
+       b.subscribeClusterAccount(ClusterAccountUpdateEvent, handler)
+}
+
+// SubscribeClusterAccountDeleted registers a typed handler invoked whenever a
+// cluster_account::delete event is published.
+func (b *Bus) SubscribeClusterAccountDeleted(handler ClusterAccountHandler) {
+       b.subscribeClusterAccount(ClusterAccountDeleteEvent, handler)
+}
+
+func (b *Bus) subscribeClusterAccount(topic EventType, handler 
ClusterAccountHandler) {
+       b.Subscribe(topic, func(event Event, value interface{}) {
+               switch a := value.(type) {
+               case models.ClusterAccount:
+                       handler(a)
+               case *models.ClusterAccount:
+                       if a != nil {
+                               handler(*a)
+                       }
+               default:
+                       slog.Warn("cluster account event payload has unexpected 
type",
+                               "type", event.Type,
+                               "got", value,
+                       )
+               }
+       })
+}
diff --git a/pkg/events/external_identity_subscribe.go 
b/pkg/events/external_identity_subscribe.go
index 27fd2f498..83c53aa38 100644
--- a/pkg/events/external_identity_subscribe.go
+++ b/pkg/events/external_identity_subscribe.go
@@ -1,3 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
 package events
 
 import (
diff --git a/pkg/events/user_dn_subscribe.go b/pkg/events/user_dn_subscribe.go
index 67b14fd33..ba6f7723b 100644
--- a/pkg/events/user_dn_subscribe.go
+++ b/pkg/events/user_dn_subscribe.go
@@ -1,3 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
 package events
 
 import (
diff --git a/pkg/models/allocation.go b/pkg/models/allocation.go
index 1037b1d7f..3b26f615c 100644
--- a/pkg/models/allocation.go
+++ b/pkg/models/allocation.go
@@ -15,6 +15,17 @@ type ComputeCluster struct {
        Name string `json:"name" db:"name"` // A human-readable name for the 
compute cluster, e.g., "Cluster A", "Cluster B", etc.
 }
 
+// ClusterAccount represents the provisioned posix account a user holds on a
+// specific compute cluster (e.g. username "alice123" on cluster "abc").
+// The (compute_cluster_id, username) pair is unique.
+type ClusterAccount struct {
+       ID               string           `json:"id"                 db:"id"`
+       UserID           string           `json:"user_id"            
db:"user_id"`
+       ComputeClusterID string           `json:"compute_cluster_id" 
db:"compute_cluster_id"`
+       Username         string           `json:"username"           
db:"username"`
+       Status           AllocationStatus `json:"status"             
db:"status"` // ACTIVE, INACTIVE, DELETED
+}
+
 type ComputeAllocation struct {
        ID               string           `json:"id"                 db:"id"`
        ProjectID        string           `json:"project_id"         
db:"project_id"`
diff --git a/pkg/models/project.go b/pkg/models/project.go
index d6325959d..879f57841 100644
--- a/pkg/models/project.go
+++ b/pkg/models/project.go
@@ -3,12 +3,13 @@ package models
 import "time"
 
 type Project struct {
-       ID           string    `json:"id"            db:"id"`
-       OriginatedID string    `json:"originated_id" db:"originated_id"` // The 
ID of the project in origination. For example: ACCESS Record ID.
-       Title        string    `json:"title"         db:"title"`
-       Origination  string    `json:"origination"   db:"origination"` // 
ACCESS, NAIRR, XRASS, etc.
-       ProjectPIID  string    `json:"project_pi_id" db:"project_pi_id"`
-       CreatedTime  time.Time `json:"created_time"  db:"created_time"`
+       ID           string           `json:"id"            db:"id"`
+       OriginatedID string           `json:"originated_id" db:"originated_id"` 
// The ID of the project in origination. For example: ACCESS Record ID.
+       Title        string           `json:"title"         db:"title"`
+       Origination  string           `json:"origination"   db:"origination"` 
// ACCESS, NAIRR, XRASS, etc.
+       ProjectPIID  string           `json:"project_pi_id" db:"project_pi_id"`
+       Status       AllocationStatus `json:"status"        db:"status"` // 
ACTIVE, INACTIVE, DELETED
+       CreatedTime  time.Time        `json:"created_time"  db:"created_time"`
 }
 
 type Organization struct {
@@ -17,11 +18,34 @@ type Organization struct {
        Name         string `json:"name"          db:"name"`
 }
 
+type UserStatus string
+
+const (
+       UserActive   UserStatus = "ACTIVE"
+       UserInactive UserStatus = "INACTIVE"
+       // UserMerged marks the retiring user after a Service.MergeUsers call.
+       // The row is kept as a mapping so historical references stay 
resolvable, the
+       // linkage to the surviving user lives in the user_merges table.
+       UserMerged UserStatus = "MERGED"
+)
+
 type User struct {
-       ID             string `json:"id"              db:"id"`
-       OrganizationID string `json:"organization_id" db:"organization_id"`
-       FirstName      string `json:"first_name"      db:"first_name"`
-       LastName       string `json:"last_name"       db:"last_name"`
-       MiddleName     string `json:"middle_name,omitempty" db:"middle_name"`
-       Email          string `json:"email"           db:"email"`
+       ID             string     `json:"id"                    db:"id"`
+       OrganizationID string     `json:"organization_id"       
db:"organization_id"`
+       FirstName      string     `json:"first_name"            db:"first_name"`
+       LastName       string     `json:"last_name"             db:"last_name"`
+       MiddleName     string     `json:"middle_name,omitempty" 
db:"middle_name"`
+       Email          string     `json:"email"                 db:"email"`
+       Status         UserStatus `json:"status"                db:"status"`
+}
+
+// UserMerge records the linkage produced by Service.MergeUsers. The retiring
+// user's row is kept (with status=MERGED), so historical references remain
+// resolvable.
+type UserMerge struct {
+       ID              int64     `json:"id"                 db:"id"`
+       RetiringUserID  string    `json:"retiring_user_id"   
db:"retiring_user_id"`
+       SurvivingUserID string    `json:"surviving_user_id"  
db:"surviving_user_id"`
+       Reason          string    `json:"reason,omitempty"   db:"reason"`
+       MergedAt        time.Time `json:"merged_at"          db:"merged_at"`
 }
diff --git a/pkg/service/cluster_account.go b/pkg/service/cluster_account.go
new file mode 100644
index 000000000..a19bad270
--- /dev/null
+++ b/pkg/service/cluster_account.go
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package service
+
+import (
+       "context"
+       "database/sql"
+       "fmt"
+
+       "github.com/apache/airavata-custos/pkg/events"
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// CreateClusterAccount persists a new cluster account for a user on a cluster.
+// If account.ID is empty, a new UUID is generated. The referenced user and
+// cluster must exist; (compute_cluster_id, username) is unique.
+func (s *Service) CreateClusterAccount(ctx context.Context, account 
*models.ClusterAccount) (*models.ClusterAccount, error) {
+       if account == nil {
+               return nil, fmt.Errorf("%w: cluster account is nil", 
ErrInvalidInput)
+       }
+       if account.UserID == "" {
+               return nil, fmt.Errorf("%w: cluster account user_id is 
required", ErrInvalidInput)
+       }
+       if account.ComputeClusterID == "" {
+               return nil, fmt.Errorf("%w: cluster account compute_cluster_id 
is required", ErrInvalidInput)
+       }
+       if account.Username == "" {
+               return nil, fmt.Errorf("%w: cluster account username is 
required", ErrInvalidInput)
+       }
+
+       if user, err := s.users.FindByID(ctx, account.UserID); err != nil {
+               return nil, fmt.Errorf("verify user: %w", err)
+       } else if user == nil {
+               return nil, fmt.Errorf("%w: user %q does not exist", 
ErrInvalidInput, account.UserID)
+       }
+       if cluster, err := s.clusters.FindByID(ctx, account.ComputeClusterID); 
err != nil {
+               return nil, fmt.Errorf("verify compute cluster: %w", err)
+       } else if cluster == nil {
+               return nil, fmt.Errorf("%w: compute cluster %q does not exist", 
ErrInvalidInput, account.ComputeClusterID)
+       }
+
+       if existing, err := s.clusterAccounts.FindByClusterAndUsername(ctx, 
account.ComputeClusterID, account.Username); err != nil {
+               return nil, fmt.Errorf("lookup cluster account: %w", err)
+       } else if existing != nil {
+               return nil, fmt.Errorf("%w: cluster account %q on cluster %q", 
ErrAlreadyExists, account.Username, account.ComputeClusterID)
+       }
+
+       if account.ID == "" {
+               account.ID = newID()
+       }
+       if account.Status == "" {
+               account.Status = models.ACTIVE
+       }
+
+       if err := s.inTx(ctx, func(tx *sql.Tx) error {
+               return s.clusterAccounts.Create(ctx, tx, account)
+       }); err != nil {
+               return nil, fmt.Errorf("create cluster account: %w", err)
+       }
+
+       s.eventBus.Publish(events.ClusterAccountCreateEvent, account)
+       return account, nil
+}
+
+// GetClusterAccount retrieves a cluster account by ID.
+func (s *Service) GetClusterAccount(ctx context.Context, id string) 
(*models.ClusterAccount, error) {
+       a, err := s.clusterAccounts.FindByID(ctx, id)
+       if err != nil {
+               return nil, fmt.Errorf("get cluster account: %w", err)
+       }
+       if a == nil {
+               return nil, ErrNotFound
+       }
+       return a, nil
+}
+
+// GetClusterAccountByClusterAndUsername resolves a cluster account by its
+// natural key.
+func (s *Service) GetClusterAccountByClusterAndUsername(ctx context.Context, 
clusterID, username string) (*models.ClusterAccount, error) {
+       a, err := s.clusterAccounts.FindByClusterAndUsername(ctx, clusterID, 
username)
+       if err != nil {
+               return nil, fmt.Errorf("get cluster account by 
cluster/username: %w", err)
+       }
+       if a == nil {
+               return nil, ErrNotFound
+       }
+       return a, nil
+}
+
+// ListClusterAccountsForUser returns every cluster account belonging to a 
user.
+func (s *Service) ListClusterAccountsForUser(ctx context.Context, userID 
string) ([]models.ClusterAccount, error) {
+       out, err := s.clusterAccounts.FindByUser(ctx, userID)
+       if err != nil {
+               return nil, fmt.Errorf("list cluster accounts by user: %w", err)
+       }
+       return out, nil
+}
+
+// ListClusterAccountsForCluster returns every cluster account on a cluster.
+func (s *Service) ListClusterAccountsForCluster(ctx context.Context, clusterID 
string) ([]models.ClusterAccount, error) {
+       out, err := s.clusterAccounts.FindByCluster(ctx, clusterID)
+       if err != nil {
+               return nil, fmt.Errorf("list cluster accounts by cluster: %w", 
err)
+       }
+       return out, nil
+}
+
+// UpdateClusterAccountStatus flips only the lifecycle status.
+func (s *Service) UpdateClusterAccountStatus(ctx context.Context, id string, 
status models.AllocationStatus) (*models.ClusterAccount, error) {
+       if id == "" {
+               return nil, fmt.Errorf("%w: cluster account id is required", 
ErrInvalidInput)
+       }
+       switch status {
+       case models.ACTIVE, models.INACTIVE, models.DELETED:
+       default:
+               return nil, fmt.Errorf("%w: invalid cluster account status %q", 
ErrInvalidInput, status)
+       }
+
+       a, err := s.clusterAccounts.FindByID(ctx, id)
+       if err != nil {
+               return nil, fmt.Errorf("lookup cluster account: %w", err)
+       }
+       if a == nil {
+               return nil, ErrNotFound
+       }
+       if a.Status == status {
+               return a, nil
+       }
+
+       if err := s.inTx(ctx, func(tx *sql.Tx) error {
+               return s.clusterAccounts.UpdateStatus(ctx, tx, id, status)
+       }); err != nil {
+               return nil, fmt.Errorf("update cluster account status: %w", err)
+       }
+       a.Status = status
+
+       s.eventBus.Publish(events.ClusterAccountUpdateEvent, a)
+       return a, nil
+}
+
+// DeleteClusterAccount removes a cluster account by ID.
+func (s *Service) DeleteClusterAccount(ctx context.Context, id string) error {
+       if id == "" {
+               return fmt.Errorf("%w: cluster account id is required", 
ErrInvalidInput)
+       }
+       a, err := s.clusterAccounts.FindByID(ctx, id)
+       if err != nil {
+               return fmt.Errorf("lookup cluster account: %w", err)
+       }
+       if a == nil {
+               return ErrNotFound
+       }
+       if err := s.inTx(ctx, func(tx *sql.Tx) error {
+               return s.clusterAccounts.Delete(ctx, tx, id)
+       }); err != nil {
+               return fmt.Errorf("delete cluster account: %w", err)
+       }
+
+       s.eventBus.Publish(events.ClusterAccountDeleteEvent, a)
+       return nil
+}
diff --git a/pkg/service/project.go b/pkg/service/project.go
index 938ff4f8e..c08150a2c 100644
--- a/pkg/service/project.go
+++ b/pkg/service/project.go
@@ -56,6 +56,9 @@ func (s *Service) CreateProject(ctx context.Context, project 
*models.Project) (*
        if project.ID == "" {
                project.ID = newID()
        }
+       if project.Status == "" {
+               project.Status = models.ACTIVE
+       }
        if project.CreatedTime.IsZero() {
                project.CreatedTime = nowUTC()
        }
@@ -118,6 +121,39 @@ func (s *Service) UpdateProject(ctx context.Context, 
project *models.Project) er
        return nil
 }
 
+// UpdateProjectStatus flips only the project's lifecycle status.
+func (s *Service) UpdateProjectStatus(ctx context.Context, id string, status 
models.AllocationStatus) (*models.Project, error) {
+       if id == "" {
+               return nil, fmt.Errorf("%w: project id is required", 
ErrInvalidInput)
+       }
+       switch status {
+       case models.ACTIVE, models.INACTIVE, models.DELETED:
+       default:
+               return nil, fmt.Errorf("%w: invalid project status %q", 
ErrInvalidInput, status)
+       }
+
+       project, err := s.projs.FindByID(ctx, id)
+       if err != nil {
+               return nil, fmt.Errorf("lookup project: %w", err)
+       }
+       if project == nil {
+               return nil, ErrNotFound
+       }
+       if project.Status == status {
+               return project, nil
+       }
+
+       if err := s.inTx(ctx, func(tx *sql.Tx) error {
+               return s.projs.UpdateStatus(ctx, tx, id, status)
+       }); err != nil {
+               return nil, fmt.Errorf("update project status: %w", err)
+       }
+       project.Status = status
+
+       s.eventBus.Publish(events.ProjectUpdateEvent, project)
+       return project, nil
+}
+
 // DeleteProject removes a project by ID.
 func (s *Service) DeleteProject(ctx context.Context, id string) error {
        if id == "" {
diff --git a/pkg/service/service.go b/pkg/service/service.go
index aa9538a52..d361d4af2 100644
--- a/pkg/service/service.go
+++ b/pkg/service/service.go
@@ -51,6 +51,8 @@ type Service struct {
        usages           store.ComputeAllocationUsageStore
        extIDs           store.ExternalIdentityStore
        userDNs          store.UserDNStore
+       clusterAccounts  store.ClusterAccountStore
+       userMerges       store.UserMergeStore
 }
 
 // New constructs a Service backed by the supplied database handle.
@@ -74,6 +76,8 @@ func New(database *sqlx.DB, eventBus *events.Bus) *Service {
                usages:           
store.NewComputeAllocationUsageStore(database),
                extIDs:           store.NewExternalIdentityStore(database),
                userDNs:          store.NewUserDNStore(database),
+               clusterAccounts:  store.NewClusterAccountStore(database),
+               userMerges:       store.NewUserMergeStore(database),
        }
 }
 
@@ -98,6 +102,8 @@ func NewWithStores(
        usages store.ComputeAllocationUsageStore,
        extIDs store.ExternalIdentityStore,
        userDNs store.UserDNStore,
+       clusterAccounts store.ClusterAccountStore,
+       userMerges store.UserMergeStore,
 ) *Service {
        return &Service{
                db:               database,
@@ -117,6 +123,8 @@ func NewWithStores(
                usages:           usages,
                extIDs:           extIDs,
                userDNs:          userDNs,
+               clusterAccounts:  clusterAccounts,
+               userMerges:       userMerges,
        }
 }
 
diff --git a/pkg/service/user.go b/pkg/service/user.go
index da809881c..d7371979b 100644
--- a/pkg/service/user.go
+++ b/pkg/service/user.go
@@ -53,6 +53,9 @@ func (s *Service) CreateUser(ctx context.Context, user 
*models.User) (*models.Us
        if user.ID == "" {
                user.ID = newID()
        }
+       if user.Status == "" {
+               user.Status = models.UserActive
+       }
 
        if err := s.inTx(ctx, func(tx *sql.Tx) error {
                return s.users.Create(ctx, tx, user)
@@ -74,6 +77,27 @@ func (s *Service) GetUser(ctx context.Context, id string) 
(*models.User, error)
        return u, nil
 }
 
+// GetUserByExternalIdentity resolves a user via their (source, external_id)
+// entry. Returns ErrNotFound when either the external identity does not
+// exist or the user it points to has been deleted.
+func (s *Service) GetUserByExternalIdentity(ctx context.Context, source, 
externalID string) (*models.User, error) {
+       ext, err := s.extIDs.FindBySourceAndExternalID(ctx, source, externalID)
+       if err != nil {
+               return nil, fmt.Errorf("lookup external identity: %w", err)
+       }
+       if ext == nil {
+               return nil, ErrNotFound
+       }
+       u, err := s.users.FindByID(ctx, ext.UserID)
+       if err != nil {
+               return nil, fmt.Errorf("lookup user: %w", err)
+       }
+       if u == nil {
+               return nil, ErrNotFound
+       }
+       return u, nil
+}
+
 // GetUserByEmail retrieves a user by email.
 func (s *Service) GetUserByEmail(ctx context.Context, email string) 
(*models.User, error) {
        u, err := s.users.FindByEmail(ctx, email)
diff --git a/pkg/service/user_merge.go b/pkg/service/user_merge.go
new file mode 100644
index 000000000..661bc6d34
--- /dev/null
+++ b/pkg/service/user_merge.go
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package service
+
+import (
+       "context"
+       "database/sql"
+       "fmt"
+
+       "github.com/apache/airavata-custos/pkg/events"
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// MergeUsers consolidates the retiring user into the surviving user. All
+// identity-forward state moves to the survivor. Historical state stays with 
no change.
+//
+// Moved to survivor (duplicates on the retiring user are
+// dropped first, then the remainder is re-pointed):
+//   - external_identities
+//   - user_dns
+//   - cluster_accounts
+//   - projects.project_pi_id
+//   - compute_allocation_memberships
+//
+// Left in place (historical truth — who actually did the thing):
+//   - compute_allocation_change_requests (requester / approver)
+//   - compute_allocation_usages
+//
+// The retiring user row is flipped to status=MERGED (soft-delete) and a row
+// is written to user_merges with the surviving user and the given reason.
+// Historical references to the retiring user remain
+// resolvable. The linkage lives in user_merges table.
+// All work happens in a single transaction.
+func (s *Service) MergeUsers(ctx context.Context, survivingID, retiringID, 
reason string) (*models.User, error) {
+       if survivingID == "" || retiringID == "" {
+               return nil, fmt.Errorf("%w: surviving and retiring user IDs are 
required", ErrInvalidInput)
+       }
+       if survivingID == retiringID {
+               return nil, fmt.Errorf("%w: cannot merge a user with itself", 
ErrInvalidInput)
+       }
+
+       survivor, err := s.users.FindByID(ctx, survivingID)
+       if err != nil {
+               return nil, fmt.Errorf("lookup surviving user: %w", err)
+       }
+       if survivor == nil {
+               return nil, fmt.Errorf("%w: surviving user %q does not exist", 
ErrInvalidInput, survivingID)
+       }
+       retiring, err := s.users.FindByID(ctx, retiringID)
+       if err != nil {
+               return nil, fmt.Errorf("lookup retiring user: %w", err)
+       }
+       if retiring == nil {
+               return nil, fmt.Errorf("%w: retiring user %q does not exist", 
ErrInvalidInput, retiringID)
+       }
+
+       if err := s.inTx(ctx, func(tx *sql.Tx) error {
+               if err := s.extIDs.ReassignUser(ctx, tx, retiringID, 
survivingID); err != nil {
+                       return fmt.Errorf("reassign external identities: %w", 
err)
+               }
+               if err := s.userDNs.ReassignUser(ctx, tx, retiringID, 
survivingID); err != nil {
+                       return fmt.Errorf("reassign user dns: %w", err)
+               }
+               if err := s.clusterAccounts.ReassignUser(ctx, tx, retiringID, 
survivingID); err != nil {
+                       return fmt.Errorf("reassign cluster accounts: %w", err)
+               }
+               if err := s.projs.ReassignPI(ctx, tx, retiringID, survivingID); 
err != nil {
+                       return fmt.Errorf("reassign project PI: %w", err)
+               }
+               if err := s.memberships.ReassignUser(ctx, tx, retiringID, 
survivingID); err != nil {
+                       return fmt.Errorf("reassign memberships: %w", err)
+               }
+               if err := s.users.UpdateStatus(ctx, tx, retiringID, 
models.UserMerged); err != nil {
+                       return fmt.Errorf("mark retiring user merged: %w", err)
+               }
+               return s.userMerges.Record(ctx, tx, retiringID, survivingID, 
reason)
+       }); err != nil {
+               return nil, fmt.Errorf("merge users: %w", err)
+       }
+
+       retiring.Status = models.UserMerged
+       s.eventBus.Publish(events.UserUpdateEvent, retiring)
+       s.eventBus.Publish(events.UserUpdateEvent, survivor)
+       return survivor, nil
+}

Reply via email to