This is an automated email from the ASF dual-hosted git repository.
lahirujayathilake pushed a commit to branch allocation-management
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
The following commit(s) were added to refs/heads/allocation-management by this
push:
new c703596f0 Implement allocation hierarchy in AMIE handler
c703596f0 is described below
commit c703596f0a2f75ab2847ffb5b8cacfda4f7eac11
Author: lahiruj <[email protected]>
AuthorDate: Tue May 12 21:36:26 2026 -0400
Implement allocation hierarchy in AMIE handler
---
.../db/migrations/000001_initial_schema.down.sql | 8 +
.../db/migrations/000001_initial_schema.up.sql | 100 ++++++
.../dev-seed.sql} | 17 +-
connectors/ACCESS/AMIE-Processor/go.mod | 1 +
connectors/ACCESS/AMIE-Processor/go.sum | 2 +
.../handler/request_project_create.go | 22 ++
.../handler/request_project_create_test.go | 40 ++-
connectors/ACCESS/AMIE-Processor/main.go | 8 +-
.../ACCESS/AMIE-Processor/model/allocation.go | 48 +++
.../AMIE-Processor/model/allocation_source.go | 34 +++
connectors/ACCESS/AMIE-Processor/model/award.go | 56 ++++
.../ACCESS/AMIE-Processor/model/credit_transfer.go | 51 ++++
.../ACCESS/AMIE-Processor/model/resource_type.go | 39 +++
.../AMIE-Processor/service/allocation_service.go | 334 +++++++++++++++++++++
.../store/allocation_source_store.go | 51 ++++
.../AMIE-Processor/store/allocation_store.go | 87 ++++++
.../ACCESS/AMIE-Processor/store/award_store.go | 101 +++++++
.../AMIE-Processor/store/credit_transfer_store.go | 55 ++++
.../AMIE-Processor/store/resource_type_store.go | 79 +++++
connectors/ACCESS/AMIE-Processor/store/stores.go | 35 +++
core/domain/model/audit_log.go | 3 +
21 files changed, 1157 insertions(+), 14 deletions(-)
diff --git
a/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.down.sql
b/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.down.sql
index 88d0ffac7..17828bedf 100644
---
a/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.down.sql
+++
b/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.down.sql
@@ -15,6 +15,14 @@
-- specific language governing permissions and limitations
-- under the License.
+DROP TRIGGER IF EXISTS credit_transfers_no_delete;
+DROP TRIGGER IF EXISTS credit_transfers_no_update;
+DROP TABLE IF EXISTS credit_transfers;
+DROP TABLE IF EXISTS allocations;
+DROP TABLE IF EXISTS awards;
+DROP TABLE IF EXISTS resource_types;
+DROP TABLE IF EXISTS allocation_sources;
+
DROP TABLE IF EXISTS amie_processing_errors;
DROP TABLE IF EXISTS amie_processing_events;
DROP TABLE IF EXISTS amie_packets;
diff --git
a/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.up.sql
b/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.up.sql
index a63569421..8075bfa10 100644
---
a/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.up.sql
+++
b/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.up.sql
@@ -115,3 +115,103 @@ CREATE TABLE IF NOT EXISTS amie_processing_errors
KEY idx_errors_amie_event_id (event_id),
KEY idx_errors_amie_occurred_at (occurred_at)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
+
+-- ============================================================
+-- Allocation hierarchy: registries, awards, allocations, ledger
+-- ============================================================
+
+CREATE TABLE IF NOT EXISTS allocation_sources
+(
+ id VARCHAR(255) NOT NULL,
+ name VARCHAR(64) NOT NULL,
+ display_name VARCHAR(128) NOT NULL,
+ adapter_type VARCHAR(64) NOT NULL,
+ is_active BOOLEAN NOT NULL DEFAULT TRUE,
+ created_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+ updated_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE
CURRENT_TIMESTAMP(6),
+ PRIMARY KEY (id),
+ UNIQUE KEY uq_alloc_sources_name (name)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
+
+CREATE TABLE IF NOT EXISTS resource_types
+(
+ id VARCHAR(255) NOT NULL,
+ name VARCHAR(64) NOT NULL,
+ display_name VARCHAR(128) NOT NULL,
+ unit VARCHAR(32) NOT NULL,
+ slurm_tres VARCHAR(128) NULL,
+ is_active BOOLEAN NOT NULL DEFAULT TRUE,
+ created_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+ updated_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE
CURRENT_TIMESTAMP(6),
+ PRIMARY KEY (id),
+ UNIQUE KEY uq_resource_types_name (name)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
+
+CREATE TABLE IF NOT EXISTS awards
+(
+ id VARCHAR(255) NOT NULL,
+ project_id VARCHAR(255) NOT NULL,
+ source_id VARCHAR(255) NOT NULL,
+ parent_award_id VARCHAR(255) NULL,
+ external_id VARCHAR(255) NULL,
+ total_credits DECIMAL(18,6) NOT NULL,
+ category VARCHAR(64) NOT NULL DEFAULT 'new',
+ start_date DATE NOT NULL,
+ end_date DATE NOT NULL,
+ status VARCHAR(32) NOT NULL DEFAULT 'active',
+ created_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+ updated_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON
UPDATE CURRENT_TIMESTAMP(6),
+ PRIMARY KEY (id),
+ CONSTRAINT fk_awards_project FOREIGN KEY (project_id) REFERENCES projects
(id),
+ CONSTRAINT fk_awards_source FOREIGN KEY (source_id) REFERENCES
allocation_sources (id),
+ CONSTRAINT fk_awards_parent FOREIGN KEY (parent_award_id) REFERENCES
awards (id),
+ KEY idx_awards_project (project_id),
+ KEY idx_awards_status (status),
+ KEY idx_awards_parent (parent_award_id)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
+
+CREATE TABLE IF NOT EXISTS allocations
+(
+ id VARCHAR(255) NOT NULL,
+ award_id VARCHAR(255) NOT NULL,
+ resource_type_id VARCHAR(255) NOT NULL,
+ source_credits DECIMAL(18,6) NOT NULL,
+ balance_credits DECIMAL(18,6) NOT NULL DEFAULT 0,
+ start_date DATE NOT NULL,
+ end_date DATE NOT NULL,
+ status VARCHAR(32) NOT NULL DEFAULT 'active',
+ slurm_account VARCHAR(255) NULL,
+ created_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+ updated_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON
UPDATE CURRENT_TIMESTAMP(6),
+ PRIMARY KEY (id),
+ CONSTRAINT fk_alloc_award FOREIGN KEY (award_id) REFERENCES
awards (id),
+ CONSTRAINT fk_alloc_resource FOREIGN KEY (resource_type_id) REFERENCES
resource_types (id),
+ UNIQUE KEY uq_alloc_slurm_account (slurm_account),
+ KEY idx_alloc_award (award_id),
+ KEY idx_alloc_status (status)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
+
+CREATE TABLE IF NOT EXISTS credit_transfers
+(
+ id VARCHAR(255) NOT NULL,
+ allocation_id VARCHAR(255) NOT NULL,
+ transfer_type VARCHAR(32) NOT NULL,
+ amount DECIMAL(18,6) NOT NULL,
+ balance_after DECIMAL(18,6) NOT NULL,
+ reference_type VARCHAR(64) NULL,
+ reference_id VARCHAR(255) NULL,
+ description VARCHAR(512) NULL,
+ performed_by_id VARCHAR(255) NULL,
+ created_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+ PRIMARY KEY (id),
+ CONSTRAINT fk_ct_allocation FOREIGN KEY (allocation_id) REFERENCES
allocations (id),
+ CONSTRAINT fk_ct_person FOREIGN KEY (performed_by_id) REFERENCES
persons (id),
+ CONSTRAINT chk_ct_type CHECK (transfer_type IN
('GRANT','USAGE_CHARGE','ADJUSTMENT','EXPIRATION')),
+ KEY idx_ct_allocation (allocation_id),
+ KEY idx_ct_alloc_type (allocation_id, transfer_type),
+ KEY idx_ct_created_at (created_at)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
+
+CREATE TRIGGER credit_transfers_no_update BEFORE UPDATE ON credit_transfers
FOR EACH ROW SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'credit_transfers is
append-only; updates are not permitted';
+
+CREATE TRIGGER credit_transfers_no_delete BEFORE DELETE ON credit_transfers
FOR EACH ROW SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'credit_transfers is
append-only; deletes are not permitted';
diff --git
a/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.down.sql
b/connectors/ACCESS/AMIE-Processor/db/seed/dev-seed.sql
similarity index 57%
copy from
connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.down.sql
copy to connectors/ACCESS/AMIE-Processor/db/seed/dev-seed.sql
index 88d0ffac7..f59fb301e 100644
---
a/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.down.sql
+++ b/connectors/ACCESS/AMIE-Processor/db/seed/dev-seed.sql
@@ -14,10 +14,15 @@
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
+--
+-- Dev / Nexus site seed data. Apply after migrations on a fresh database:
+-- mysql -u admin -padmin access_ci < db/seed/dev-seed.sql
+-- Production deployments should provide their own seed reflecting the site's
+-- allocation sources and the GPU / accelerator types actually installed.
+
+INSERT INTO allocation_sources (id, name, display_name, adapter_type) VALUES
+ (UUID(), 'access', 'ACCESS-CI', 'amie');
-DROP TABLE IF EXISTS amie_processing_errors;
-DROP TABLE IF EXISTS amie_processing_events;
-DROP TABLE IF EXISTS amie_packets;
-DROP TABLE IF EXISTS project_memberships;
-DROP TABLE IF EXISTS projects;
-DROP TABLE IF EXISTS cluster_accounts;
+INSERT INTO resource_types (id, name, display_name, unit) VALUES
+ (UUID(), 'b200', 'NVIDIA B200 GPU Hours', 'hours'),
+ (UUID(), 'rtx6000', 'NVIDIA RTX 6000 GPU Hours', 'hours');
diff --git a/connectors/ACCESS/AMIE-Processor/go.mod
b/connectors/ACCESS/AMIE-Processor/go.mod
index 361901b18..5ea9ba61d 100644
--- a/connectors/ACCESS/AMIE-Processor/go.mod
+++ b/connectors/ACCESS/AMIE-Processor/go.mod
@@ -10,6 +10,7 @@ require (
github.com/jmoiron/sqlx v1.4.0
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/client_model v0.6.1
+ github.com/shopspring/decimal v1.4.0
github.com/stretchr/testify v1.10.0
google.golang.org/protobuf v1.36.7
gopkg.in/yaml.v3 v3.0.1
diff --git a/connectors/ACCESS/AMIE-Processor/go.sum
b/connectors/ACCESS/AMIE-Processor/go.sum
index 605490ad3..df2bb5faa 100644
--- a/connectors/ACCESS/AMIE-Processor/go.sum
+++ b/connectors/ACCESS/AMIE-Processor/go.sum
@@ -83,6 +83,8 @@ github.com/prometheus/procfs v0.15.1
h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg
github.com/prometheus/procfs v0.15.1/go.mod
h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rogpeppe/go-internal v1.10.0
h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod
h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
+github.com/shopspring/decimal v1.4.0
h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
+github.com/shopspring/decimal v1.4.0/go.mod
h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod
h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.10.0
h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
index 4e04d6659..cd9783744 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
@@ -23,6 +23,7 @@ import (
"fmt"
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/service"
coremodel "github.com/apache/airavata-custos/core/domain/model"
)
@@ -42,6 +43,10 @@ type requestProjectCreateMembershipService interface {
CreateMembership(ctx context.Context, tx *sql.Tx, projectID,
clusterAccountID, role string) (*model.ProjectMembership, error)
}
+type requestProjectCreateAllocationService interface {
+ CreateFromProjectPacket(ctx context.Context, tx *sql.Tx, project
*model.Project, body map[string]any, packetID string)
(*service.AllocationResult, error)
+}
+
type requestProjectCreateAmieClient interface {
ReplyToPacket(ctx context.Context, packetRecID int64, reply
map[string]any) error
}
@@ -55,6 +60,7 @@ type RequestProjectCreateHandler struct {
accountSvc requestProjectCreateAccountService
projectSvc requestProjectCreateProjectService
membershipSvc requestProjectCreateMembershipService
+ allocationSvc requestProjectCreateAllocationService
amieClient requestProjectCreateAmieClient
auditSvc requestProjectCreateAuditService
}
@@ -64,6 +70,7 @@ func NewRequestProjectCreateHandler(
accountSvc requestProjectCreateAccountService,
projectSvc requestProjectCreateProjectService,
membershipSvc requestProjectCreateMembershipService,
+ allocationSvc requestProjectCreateAllocationService,
amieClient requestProjectCreateAmieClient,
auditSvc requestProjectCreateAuditService,
) *RequestProjectCreateHandler {
@@ -72,6 +79,7 @@ func NewRequestProjectCreateHandler(
accountSvc: accountSvc,
projectSvc: projectSvc,
membershipSvc: membershipSvc,
+ allocationSvc: allocationSvc,
amieClient: amieClient,
auditSvc: auditSvc,
}
@@ -153,6 +161,20 @@ func (h *RequestProjectCreateHandler) Handle(ctx
context.Context, tx *sql.Tx, pa
return fmt.Errorf("request_project_create: audit
CREATE_MEMBERSHIP: %w", err)
}
+ allocResult, err := h.allocationSvc.CreateFromProjectPacket(ctx, tx,
project, body, packet.ID)
+ if err != nil {
+ return fmt.Errorf("request_project_create: creating allocation
hierarchy: %w", err)
+ }
+ if allocResult.Transfer != nil {
+ summary := fmt.Sprintf("award %s (%s): %s credits",
allocResult.Award.ID, allocResult.Category,
allocResult.Award.TotalCredits.String())
+ if err := h.auditSvc.Log(ctx, tx, packet.ID, eventID,
coremodel.AuditGrantAward, "award", allocResult.Award.ID, summary); err != nil {
+ return fmt.Errorf("request_project_create: audit
GRANT_AWARD: %w", err)
+ }
+ if err := h.auditSvc.Log(ctx, tx, packet.ID, eventID,
coremodel.AuditPostCreditTransfer, "credit_transfer", allocResult.Transfer.ID,
""); err != nil {
+ return fmt.Errorf("request_project_create: audit
POST_CREDIT_TRANSFER: %w", err)
+ }
+ }
+
// Build and send the reply.
replyBody := map[string]any{
"ProjectID": project.ID,
diff --git
a/connectors/ACCESS/AMIE-Processor/handler/request_project_create_test.go
b/connectors/ACCESS/AMIE-Processor/handler/request_project_create_test.go
index b5e2c969b..2d616ec5a 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_project_create_test.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_project_create_test.go
@@ -22,11 +22,14 @@ import (
"database/sql"
"testing"
-
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
- coremodel "github.com/apache/airavata-custos/core/domain/model"
+ "github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
+
+
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/service"
+ coremodel "github.com/apache/airavata-custos/core/domain/model"
)
// ---------------------------------------------------------------------------
@@ -85,6 +88,16 @@ func (m *mockRPCAuditService) Log(ctx context.Context, tx
*sql.Tx, packetID, eve
return m.Called(ctx, tx, packetID, eventID, action, entityType,
entityID, summary).Error(0)
}
+type mockRPCAllocationService struct{ mock.Mock }
+
+func (m *mockRPCAllocationService) CreateFromProjectPacket(ctx
context.Context, tx *sql.Tx, project *model.Project, body map[string]any,
packetID string) (*service.AllocationResult, error) {
+ args := m.Called(ctx, tx, project, body, packetID)
+ if args.Get(0) == nil {
+ return nil, args.Error(1)
+ }
+ return args.Get(0).(*service.AllocationResult), args.Error(1)
+}
+
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
@@ -94,10 +107,11 @@ func newRPCHandler(
as *mockRPCAccountService,
prj *mockRPCProjectService,
ms *mockRPCMembershipService,
+ alloc *mockRPCAllocationService,
ac *mockRPCAmieClient,
aud *mockRPCAuditService,
) *RequestProjectCreateHandler {
- return NewRequestProjectCreateHandler(ps, as, prj, ms, ac, aud)
+ return NewRequestProjectCreateHandler(ps, as, prj, ms, alloc, ac, aud)
}
// ---------------------------------------------------------------------------
@@ -108,6 +122,7 @@ func TestRequestProjectCreateHandler_SupportsType(t
*testing.T) {
h := newRPCHandler(
&mockRPCPersonService{}, &mockRPCAccountService{},
&mockRPCProjectService{}, &mockRPCMembershipService{},
+ &mockRPCAllocationService{},
&mockRPCAmieClient{}, &mockRPCAuditService{},
)
assert.Equal(t, "request_project_create", h.SupportsType())
@@ -119,16 +134,22 @@ func TestRequestProjectCreateHandler(t *testing.T) {
tests := []struct {
name string
input map[string]any
- setupMocks func(ps *mockRPCPersonService, as
*mockRPCAccountService, prj *mockRPCProjectService, ms
*mockRPCMembershipService, ac *mockRPCAmieClient, aud *mockRPCAuditService)
+ setupMocks func(ps *mockRPCPersonService, as
*mockRPCAccountService, prj *mockRPCProjectService, ms
*mockRPCMembershipService, alloc *mockRPCAllocationService, ac
*mockRPCAmieClient, aud *mockRPCAuditService)
wantErr string
}{
{
name: "valid packet processes successfully",
input: validFixture,
- setupMocks: func(ps *mockRPCPersonService, as
*mockRPCAccountService, prj *mockRPCProjectService, ms
*mockRPCMembershipService, ac *mockRPCAmieClient, aud *mockRPCAuditService) {
+ setupMocks: func(ps *mockRPCPersonService, as
*mockRPCAccountService, prj *mockRPCProjectService, ms
*mockRPCMembershipService, alloc *mockRPCAllocationService, ac
*mockRPCAmieClient, aud *mockRPCAuditService) {
person := &coremodel.Person{ID: "person-123"}
account := &model.ClusterAccount{ID:
"account-123", Username: "hwan"}
project := &model.Project{ID: "project-123",
GrantNumber: "NNT259276"}
+ allocResult := &service.AllocationResult{
+ Award: &model.Award{ID:
"award-123", TotalCredits: decimal.NewFromInt(1), Category:
model.AwardCategoryNew},
+ Allocation: &model.Allocation{ID:
"alloc-123"},
+ Transfer: &model.CreditTransfer{ID:
"transfer-123"},
+ Category: model.AwardCategoryNew,
+ }
ps.On("FindOrCreateFromPacket", mock.Anything,
mock.Anything, mock.Anything).Return(person, nil)
aud.On("Log", mock.Anything, mock.Anything,
mock.Anything, mock.Anything, coremodel.AuditCreatePerson, mock.Anything,
mock.Anything, mock.Anything).Return(nil)
@@ -142,6 +163,10 @@ func TestRequestProjectCreateHandler(t *testing.T) {
ms.On("CreateMembership", mock.Anything,
mock.Anything, project.ID, account.ID, "PI").Return(&model.ProjectMembership{},
nil)
aud.On("Log", mock.Anything, mock.Anything,
mock.Anything, mock.Anything, coremodel.AuditCreateMembership, mock.Anything,
mock.Anything, mock.Anything).Return(nil)
+ alloc.On("CreateFromProjectPacket",
mock.Anything, mock.Anything, project, mock.Anything,
mock.Anything).Return(allocResult, nil)
+ aud.On("Log", mock.Anything, mock.Anything,
mock.Anything, mock.Anything, coremodel.AuditGrantAward, mock.Anything,
mock.Anything, mock.Anything).Return(nil)
+ aud.On("Log", mock.Anything, mock.Anything,
mock.Anything, mock.Anything, coremodel.AuditPostCreditTransfer, mock.Anything,
mock.Anything, mock.Anything).Return(nil)
+
ac.On("ReplyToPacket", mock.Anything,
int64(233497907), mock.Anything).Return(nil)
aud.On("Log", mock.Anything, mock.Anything,
mock.Anything, mock.Anything, coremodel.AuditReplySent, mock.Anything,
mock.Anything, mock.Anything).Return(nil)
},
@@ -195,14 +220,15 @@ func TestRequestProjectCreateHandler(t *testing.T) {
as := &mockRPCAccountService{}
prj := &mockRPCProjectService{}
ms := &mockRPCMembershipService{}
+ alloc := &mockRPCAllocationService{}
ac := &mockRPCAmieClient{}
aud := &mockRPCAuditService{}
if tt.setupMocks != nil {
- tt.setupMocks(ps, as, prj, ms, ac, aud)
+ tt.setupMocks(ps, as, prj, ms, alloc, ac, aud)
}
- h := newRPCHandler(ps, as, prj, ms, ac, aud)
+ h := newRPCHandler(ps, as, prj, ms, alloc, ac, aud)
packet := &model.Packet{ID: "test-id", AmieID:
233497907, Type: "request_project_create"}
err := h.Handle(context.Background(), nil, tt.input,
packet, "event-1")
diff --git a/connectors/ACCESS/AMIE-Processor/main.go
b/connectors/ACCESS/AMIE-Processor/main.go
index 3538f908f..2d8d37bb2 100644
--- a/connectors/ACCESS/AMIE-Processor/main.go
+++ b/connectors/ACCESS/AMIE-Processor/main.go
@@ -76,19 +76,25 @@ func main() {
packetStore := store.NewPacketStore(database)
eventStore := store.NewEventStore(database)
errorStore := store.NewProcessingErrorStore(database)
+ allocationSourceStore := store.NewAllocationSourceStore(database)
+ resourceTypeStore := store.NewResourceTypeStore(database)
+ awardStore := store.NewAwardStore(database)
+ allocationStore := store.NewAllocationStore(database)
+ creditTransferStore := store.NewCreditTransferStore(database)
personSvc := service.NewPersonService(personStore, personDNStore,
accountStore, externalIdentityStore)
accountSvc := service.NewUserAccountService(accountStore)
projectSvc := service.NewProjectService(projectStore)
membershipSvc := service.NewProjectMembershipService(membershipStore,
projectStore, accountStore)
auditSvc := service.NewAuditService(auditLogStore)
+ allocationSvc := service.NewAllocationService(allocationSourceStore,
resourceTypeStore, awardStore, allocationStore, creditTransferStore)
amie := amieclient.New(cfg.AMIE)
met := metrics.New()
router := handler.NewRouter(
- handler.NewRequestProjectCreateHandler(personSvc, accountSvc,
projectSvc, membershipSvc, amie, auditSvc),
+ handler.NewRequestProjectCreateHandler(personSvc, accountSvc,
projectSvc, membershipSvc, allocationSvc, amie, auditSvc),
handler.NewRequestAccountCreateHandler(personSvc, accountSvc,
projectSvc, membershipSvc, amie, auditSvc),
handler.NewRequestProjectInactivateHandler(projectSvc,
membershipSvc, amie, auditSvc),
handler.NewRequestProjectReactivateHandler(projectSvc,
membershipSvc, amie, auditSvc),
diff --git a/connectors/ACCESS/AMIE-Processor/model/allocation.go
b/connectors/ACCESS/AMIE-Processor/model/allocation.go
new file mode 100644
index 000000000..84da69ccd
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/model/allocation.go
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package model
+
+import (
+ "time"
+
+ "github.com/shopspring/decimal"
+)
+
+// Allocation is the resource-specific draw from an Award. BalanceCredits is
the
+// running balance maintained by the credit ledger (sum of all CreditTransfers
+// for this allocation).
+type Allocation struct {
+ ID string `db:"id" json:"id"`
+ AwardID string `db:"award_id" json:"award_id"`
+ ResourceTypeID string `db:"resource_type_id"
json:"resource_type_id"`
+ SourceCredits decimal.Decimal `db:"source_credits"
json:"source_credits"`
+ BalanceCredits decimal.Decimal `db:"balance_credits"
json:"balance_credits"`
+ StartDate time.Time `db:"start_date" json:"start_date"`
+ EndDate time.Time `db:"end_date" json:"end_date"`
+ Status string `db:"status" json:"status"`
+ SlurmAccount *string `db:"slurm_account"
json:"slurm_account,omitempty"`
+ CreatedAt time.Time `db:"created_at" json:"created_at"`
+ UpdatedAt time.Time `db:"updated_at" json:"updated_at"`
+}
+
+const (
+ AllocationStatusActive = "active"
+ AllocationStatusExpired = "expired"
+ AllocationStatusSuspended = "suspended"
+ AllocationStatusDepleted = "depleted"
+)
diff --git a/connectors/ACCESS/AMIE-Processor/model/allocation_source.go
b/connectors/ACCESS/AMIE-Processor/model/allocation_source.go
new file mode 100644
index 000000000..1c60581a5
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/model/allocation_source.go
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package model
+
+import "time"
+
+// AllocationSource is an upstream authority that grants allocations.
+// e.g., ('access', 'ACCESS-CI', 'amie').
+type AllocationSource struct {
+ ID string `db:"id" json:"id"`
+ Name string `db:"name" json:"name"`
+ DisplayName string `db:"display_name" json:"display_name"`
+ AdapterType string `db:"adapter_type" json:"adapter_type"`
+ IsActive bool `db:"is_active" json:"is_active"`
+ CreatedAt time.Time `db:"created_at" json:"created_at"`
+ UpdatedAt time.Time `db:"updated_at" json:"updated_at"`
+}
+
+const SourceNameAccess = "access"
diff --git a/connectors/ACCESS/AMIE-Processor/model/award.go
b/connectors/ACCESS/AMIE-Processor/model/award.go
new file mode 100644
index 000000000..624287dff
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/model/award.go
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package model
+
+import (
+ "time"
+
+ "github.com/shopspring/decimal"
+)
+
+// Award is the approved credit pool granted to a Project from an
+// AllocationSource. Supplements/renewals create a new Award row tied to the
+// previous one via ParentAwardID.
+type Award struct {
+ ID string `db:"id" json:"id"`
+ ProjectID string `db:"project_id" json:"project_id"`
+ SourceID string `db:"source_id" json:"source_id"`
+ ParentAwardID *string `db:"parent_award_id"
json:"parent_award_id,omitempty"`
+ ExternalID *string `db:"external_id"
json:"external_id,omitempty"`
+ TotalCredits decimal.Decimal `db:"total_credits" json:"total_credits"`
+ Category string `db:"category" json:"category"`
+ StartDate time.Time `db:"start_date" json:"start_date"`
+ EndDate time.Time `db:"end_date" json:"end_date"`
+ Status string `db:"status" json:"status"`
+ CreatedAt time.Time `db:"created_at" json:"created_at"`
+ UpdatedAt time.Time `db:"updated_at" json:"updated_at"`
+}
+
+const (
+ AwardCategoryNew = "new"
+ AwardCategorySupplement = "supplement"
+ AwardCategoryRenewal = "renewal"
+)
+
+const (
+ AwardStatusPending = "pending"
+ AwardStatusActive = "active"
+ AwardStatusExpired = "expired"
+ AwardStatusSuspended = "suspended"
+ AwardStatusCancelled = "cancelled"
+)
diff --git a/connectors/ACCESS/AMIE-Processor/model/credit_transfer.go
b/connectors/ACCESS/AMIE-Processor/model/credit_transfer.go
new file mode 100644
index 000000000..6862541ee
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/model/credit_transfer.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package model
+
+import (
+ "time"
+
+ "github.com/shopspring/decimal"
+)
+
+// CreditTransfer is an immutable ledger event for a credit movement against an
+// Allocation. Append-only at the DB level (BEFORE UPDATE / BEFORE DELETE
+// triggers reject mutations).
+type CreditTransfer struct {
+ ID string `db:"id" json:"id"`
+ AllocationID string `db:"allocation_id" json:"allocation_id"`
+ TransferType string `db:"transfer_type" json:"transfer_type"`
+ Amount decimal.Decimal `db:"amount" json:"amount"`
+ BalanceAfter decimal.Decimal `db:"balance_after" json:"balance_after"`
+ ReferenceType *string `db:"reference_type"
json:"reference_type,omitempty"`
+ ReferenceID *string `db:"reference_id"
json:"reference_id,omitempty"`
+ Description *string `db:"description"
json:"description,omitempty"`
+ PerformedByID *string `db:"performed_by_id"
json:"performed_by_id,omitempty"`
+ CreatedAt time.Time `db:"created_at" json:"created_at"`
+}
+
+const (
+ TransferTypeGrant = "GRANT"
+ TransferTypeUsageCharge = "USAGE_CHARGE"
+ TransferTypeAdjustment = "ADJUSTMENT"
+ TransferTypeExpiration = "EXPIRATION"
+)
+
+const (
+ TransferReferenceAMIEPacket = "amie_packet"
+)
diff --git a/connectors/ACCESS/AMIE-Processor/model/resource_type.go
b/connectors/ACCESS/AMIE-Processor/model/resource_type.go
new file mode 100644
index 000000000..bb196d05e
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/model/resource_type.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package model
+
+import "time"
+
+// ResourceType maps a logical resource (CPU, GPU, storage) to its unit and the
+// SLURM TRES name used for accounting.
+type ResourceType struct {
+ ID string `db:"id" json:"id"`
+ Name string `db:"name" json:"name"`
+ DisplayName string `db:"display_name" json:"display_name"`
+ Unit string `db:"unit" json:"unit"`
+ SlurmTRES *string `db:"slurm_tres" json:"slurm_tres,omitempty"`
+ IsActive bool `db:"is_active" json:"is_active"`
+ CreatedAt time.Time `db:"created_at" json:"created_at"`
+ UpdatedAt time.Time `db:"updated_at" json:"updated_at"`
+}
+
+const (
+ ResourceTypeNameCPU = "cpu"
+ ResourceTypeNameGPU = "gpu"
+ ResourceTypeNameStorage = "storage"
+)
diff --git a/connectors/ACCESS/AMIE-Processor/service/allocation_service.go
b/connectors/ACCESS/AMIE-Processor/service/allocation_service.go
new file mode 100644
index 000000000..1900d9edd
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/service/allocation_service.go
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package service
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "log/slog"
+ "strings"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/shopspring/decimal"
+
+
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+)
+
+type allocationSourceStore interface {
+ FindByName(ctx context.Context, name string) (*model.AllocationSource,
error)
+}
+
+type resourceTypeStore interface {
+ EnsureByName(ctx context.Context, tx *sql.Tx, name string)
(*model.ResourceType, error)
+}
+
+type awardStore interface {
+ FindCurrentForProjectAndResource(ctx context.Context, projectID,
resourceTypeID string) (*model.Award, error)
+ Save(ctx context.Context, tx *sql.Tx, a *model.Award) error
+}
+
+type allocationStore interface {
+ FindByAwardAndResource(ctx context.Context, awardID, resourceTypeID
string) (*model.Allocation, error)
+ Save(ctx context.Context, tx *sql.Tx, a *model.Allocation) error
+ UpdateBalance(ctx context.Context, tx *sql.Tx, allocationID string,
newBalance decimal.Decimal) error
+}
+
+type creditTransferStore interface {
+ Save(ctx context.Context, tx *sql.Tx, t *model.CreditTransfer) error
+}
+
+// AllocationResult is what the allocation service returns to the caller (the
+// AMIE handler). All three fields are populated for "new", and Allocation is
+// reused (rather than newly created) for supplements.
+type AllocationResult struct {
+ Award *model.Award
+ Allocation *model.Allocation
+ Transfer *model.CreditTransfer
+ Category string // resolved category: "new", "supplement", "renewal"
+}
+
+type AllocationService struct {
+ sources allocationSourceStore
+ resourceTypes resourceTypeStore
+ awards awardStore
+ allocations allocationStore
+ transfers creditTransferStore
+}
+
+func NewAllocationService(
+ sources allocationSourceStore,
+ resourceTypes resourceTypeStore,
+ awards awardStore,
+ allocations allocationStore,
+ transfers creditTransferStore,
+) *AllocationService {
+ return &AllocationService{
+ sources: sources,
+ resourceTypes: resourceTypes,
+ awards: awards,
+ allocations: allocations,
+ transfers: transfers,
+ }
+}
+
+// CreateFromProjectPacket branches on AllocationType:
+// - "new" (or empty/unknown): new Award + new Allocation + GRANT
+// - "supplement": new Award (chained) + top up existing Allocation + GRANT
+// - "renewal": new Award (chained) + new Allocation + GRANT
+//
+// If AllocationType resolves to "new" but the project already has
+// an active Award for this resource, the call is a no-op and the returned
+// AllocationResult has nil Transfer so the caller can audit it as a duplicate.
+func (s *AllocationService) CreateFromProjectPacket(
+ ctx context.Context,
+ tx *sql.Tx,
+ project *model.Project,
+ body map[string]any,
+ packetID string,
+) (*AllocationResult, error) {
+ serviceUnits, ok := parseDecimal(body["ServiceUnitsAllocated"])
+ if !ok {
+ return nil, fmt.Errorf("allocation_service:
ServiceUnitsAllocated is required")
+ }
+ if serviceUnits.LessThanOrEqual(decimal.Zero) {
+ return nil, fmt.Errorf("allocation_service:
ServiceUnitsAllocated must be positive, got %s", serviceUnits)
+ }
+
+ startDate, err := parseDate(body["StartDate"])
+ if err != nil {
+ return nil, fmt.Errorf("allocation_service: parsing StartDate:
%w", err)
+ }
+ endDate, err := parseDate(body["EndDate"])
+ if err != nil {
+ return nil, fmt.Errorf("allocation_service: parsing EndDate:
%w", err)
+ }
+
+ resourceName := firstResource(body)
+ if resourceName == "" {
+ return nil, fmt.Errorf("allocation_service: ResourceList must
contain at least one resource")
+ }
+ resourceType, err := s.resourceTypes.EnsureByName(ctx, tx, resourceName)
+ if err != nil {
+ return nil, fmt.Errorf("allocation_service: ensuring resource
type %s: %w", resourceName, err)
+ }
+
+ source, err := s.sources.FindByName(ctx, model.SourceNameAccess)
+ if err != nil {
+ return nil, fmt.Errorf("allocation_service: looking up source
%s: %w", model.SourceNameAccess, err)
+ }
+ if source == nil {
+ return nil, fmt.Errorf("allocation_service: source %s not
configured", model.SourceNameAccess)
+ }
+
+ category := normalizeAllocationType(getString(body, "AllocationType"))
+
+ existing, err := s.awards.FindCurrentForProjectAndResource(ctx,
project.ID, resourceType.ID)
+ if err != nil {
+ return nil, fmt.Errorf("allocation_service: finding existing
award: %w", err)
+ }
+
+ switch category {
+ case model.AwardCategorySupplement, model.AwardCategoryRenewal:
+ if existing == nil {
+ slog.WarnContext(ctx, "allocation_service: no existing
award found for supplement/renewal; treating as new",
+ "project_id", project.ID,
+ "resource", resourceName,
+ "category", category)
+ category = model.AwardCategoryNew
+ }
+ default:
+ if existing != nil {
+ slog.WarnContext(ctx, "allocation_service: project
already has an active award for this resource; skipping duplicate 'new'",
+ "project_id", project.ID,
+ "resource", resourceName,
+ "existing_award_id", existing.ID,
+ "packet_id", packetID)
+ return &AllocationResult{Award: existing, Category:
category}, nil
+ }
+ }
+
+ award := &model.Award{
+ ID: uuid.NewString(),
+ ProjectID: project.ID,
+ SourceID: source.ID,
+ TotalCredits: serviceUnits,
+ Category: category,
+ StartDate: startDate,
+ EndDate: endDate,
+ Status: model.AwardStatusActive,
+ }
+ if existing != nil {
+ parentID := existing.ID
+ award.ParentAwardID = &parentID
+ }
+ if err := s.awards.Save(ctx, tx, award); err != nil {
+ return nil, fmt.Errorf("allocation_service: saving award: %w",
err)
+ }
+
+ var (
+ allocation *model.Allocation
+ newBalance decimal.Decimal
+ transferTargetID = ""
+ )
+
+ if category == model.AwardCategorySupplement {
+ existingAlloc, err := s.allocations.FindByAwardAndResource(ctx,
existing.ID, resourceType.ID)
+ if err != nil {
+ return nil, fmt.Errorf("allocation_service: finding
existing allocation: %w", err)
+ }
+ if existingAlloc == nil {
+ return nil, fmt.Errorf("allocation_service: existing
award %s has no allocation for resource %s", existing.ID, resourceName)
+ }
+ newBalance = existingAlloc.BalanceCredits.Add(serviceUnits)
+ if err := s.allocations.UpdateBalance(ctx, tx,
existingAlloc.ID, newBalance); err != nil {
+ return nil, fmt.Errorf("allocation_service: updating
allocation balance: %w", err)
+ }
+ existingAlloc.BalanceCredits = newBalance
+ allocation = existingAlloc
+ transferTargetID = existingAlloc.ID
+ } else {
+ allocation = &model.Allocation{
+ ID: uuid.NewString(),
+ AwardID: award.ID,
+ ResourceTypeID: resourceType.ID,
+ SourceCredits: serviceUnits,
+ BalanceCredits: serviceUnits,
+ StartDate: startDate,
+ EndDate: endDate,
+ Status: model.AllocationStatusActive,
+ }
+ if err := s.allocations.Save(ctx, tx, allocation); err != nil {
+ return nil, fmt.Errorf("allocation_service: saving
allocation: %w", err)
+ }
+ newBalance = serviceUnits
+ transferTargetID = allocation.ID
+ }
+
+ refType := model.TransferReferenceAMIEPacket
+ refID := packetID
+ description := fmt.Sprintf("GRANT from %s award (%s)", source.Name,
category)
+ transfer := &model.CreditTransfer{
+ ID: uuid.NewString(),
+ AllocationID: transferTargetID,
+ TransferType: model.TransferTypeGrant,
+ Amount: serviceUnits,
+ BalanceAfter: newBalance,
+ ReferenceType: &refType,
+ ReferenceID: &refID,
+ Description: &description,
+ CreatedAt: time.Now().UTC(),
+ }
+ if err := s.transfers.Save(ctx, tx, transfer); err != nil {
+ return nil, fmt.Errorf("allocation_service: saving credit
transfer: %w", err)
+ }
+
+ return &AllocationResult{
+ Award: award,
+ Allocation: allocation,
+ Transfer: transfer,
+ Category: category,
+ }, nil
+}
+
+// normalizeAllocationType maps any case variant of the AMIE
+// AllocationType field to one of our internal category constants. Unknown
+// values default to "new" so the system fails forward rather than rejecting
+// unfamiliar packets.
+func normalizeAllocationType(raw string) string {
+ v := strings.ToLower(strings.TrimSpace(raw))
+ v = strings.ReplaceAll(v, " ", "")
+ v = strings.ReplaceAll(v, "_", "")
+ v = strings.ReplaceAll(v, "-", "")
+ switch v {
+ case "supplement", "supplemental":
+ return model.AwardCategorySupplement
+ case "renewal", "renew":
+ return model.AwardCategoryRenewal
+ case "new", "":
+ return model.AwardCategoryNew
+ default:
+ return model.AwardCategoryNew
+ }
+}
+
+// firstResource accepts ResourceList as either []any (AMIE-conformant) or
+// string (mock-server simplification). Returns the lowercased trimmed name.
+func firstResource(body map[string]any) string {
+ switch v := body["ResourceList"].(type) {
+ case []any:
+ if len(v) == 0 {
+ return ""
+ }
+ s, _ := v[0].(string)
+ return strings.ToLower(strings.TrimSpace(s))
+ case string:
+ return strings.ToLower(strings.TrimSpace(v))
+ default:
+ return ""
+ }
+}
+
+func parseDecimal(v any) (decimal.Decimal, bool) {
+ switch x := v.(type) {
+ case nil:
+ return decimal.Zero, false
+ case float64:
+ return decimal.NewFromFloat(x), true
+ case float32:
+ return decimal.NewFromFloat(float64(x)), true
+ case int:
+ return decimal.NewFromInt(int64(x)), true
+ case int64:
+ return decimal.NewFromInt(x), true
+ case string:
+ d, err := decimal.NewFromString(x)
+ if err != nil {
+ return decimal.Zero, false
+ }
+ return d, true
+ default:
+ return decimal.Zero, false
+ }
+}
+
+// parseDate accepts dates as "YYYY-MM-DD", RFC3339, or time.Time directly.
+func parseDate(v any) (time.Time, error) {
+ switch x := v.(type) {
+ case nil:
+ return time.Time{}, fmt.Errorf("date is required")
+ case time.Time:
+ return x, nil
+ case string:
+ if t, err := time.Parse("2006-01-02", x); err == nil {
+ return t, nil
+ }
+ if t, err := time.Parse(time.RFC3339, x); err == nil {
+ return t, nil
+ }
+ return time.Time{}, fmt.Errorf("unrecognized date format: %s",
x)
+ default:
+ return time.Time{}, fmt.Errorf("unsupported date type: %T", v)
+ }
+}
+
+func getString(body map[string]any, key string) string {
+ v, _ := body[key].(string)
+ return v
+}
diff --git a/connectors/ACCESS/AMIE-Processor/store/allocation_source_store.go
b/connectors/ACCESS/AMIE-Processor/store/allocation_source_store.go
new file mode 100644
index 000000000..418ee9dce
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/store/allocation_source_store.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+
+ "github.com/jmoiron/sqlx"
+
+
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+)
+
+const allocationSourceColumns = `id, name, display_name, adapter_type,
is_active, created_at, updated_at`
+
+type mariaDBAllocationSourceStore struct {
+ db *sqlx.DB
+}
+
+func NewAllocationSourceStore(db *sqlx.DB) AllocationSourceStore {
+ return &mariaDBAllocationSourceStore{db: db}
+}
+
+func (s *mariaDBAllocationSourceStore) FindByName(ctx context.Context, name
string) (*model.AllocationSource, error) {
+ var src model.AllocationSource
+ err := s.db.GetContext(ctx, &src,
+ `SELECT `+allocationSourceColumns+` FROM allocation_sources
WHERE name = ? LIMIT 1`, name)
+ if err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return nil, nil
+ }
+ return nil, err
+ }
+ return &src, nil
+}
diff --git a/connectors/ACCESS/AMIE-Processor/store/allocation_store.go
b/connectors/ACCESS/AMIE-Processor/store/allocation_store.go
new file mode 100644
index 000000000..c32a95bd1
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/store/allocation_store.go
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+
+ "github.com/jmoiron/sqlx"
+ "github.com/shopspring/decimal"
+
+
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+)
+
+const allocationColumns = `id, award_id, resource_type_id, source_credits,
balance_credits, start_date, end_date, status, slurm_account, created_at,
updated_at`
+
+type mariaDBAllocationStore struct {
+ db *sqlx.DB
+}
+
+func NewAllocationStore(db *sqlx.DB) AllocationStore {
+ return &mariaDBAllocationStore{db: db}
+}
+
+func (s *mariaDBAllocationStore) FindByID(ctx context.Context, id string)
(*model.Allocation, error) {
+ var a model.Allocation
+ err := s.db.GetContext(ctx, &a,
+ `SELECT `+allocationColumns+` FROM allocations WHERE id = ?`,
id)
+ if err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return nil, nil
+ }
+ return nil, err
+ }
+ return &a, nil
+}
+
+func (s *mariaDBAllocationStore) FindByAwardAndResource(ctx context.Context,
awardID, resourceTypeID string) (*model.Allocation, error) {
+ var a model.Allocation
+ err := s.db.GetContext(ctx, &a,
+ `SELECT `+allocationColumns+`
+ FROM allocations
+ WHERE award_id = ? AND resource_type_id = ?
+ LIMIT 1`,
+ awardID, resourceTypeID)
+ if err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return nil, nil
+ }
+ return nil, err
+ }
+ return &a, nil
+}
+
+func (s *mariaDBAllocationStore) Save(ctx context.Context, tx *sql.Tx, a
*model.Allocation) error {
+ _, err := tx.ExecContext(ctx,
+ `INSERT INTO allocations
+ (id, award_id, resource_type_id, source_credits,
balance_credits,
+ start_date, end_date, status, slurm_account)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+ a.ID, a.AwardID, a.ResourceTypeID, a.SourceCredits,
a.BalanceCredits,
+ a.StartDate, a.EndDate, a.Status, a.SlurmAccount)
+ return err
+}
+
+func (s *mariaDBAllocationStore) UpdateBalance(ctx context.Context, tx
*sql.Tx, allocationID string, newBalance decimal.Decimal) error {
+ _, err := tx.ExecContext(ctx,
+ `UPDATE allocations SET balance_credits = ? WHERE id = ?`,
+ newBalance, allocationID)
+ return err
+}
diff --git a/connectors/ACCESS/AMIE-Processor/store/award_store.go
b/connectors/ACCESS/AMIE-Processor/store/award_store.go
new file mode 100644
index 000000000..56b1e5b3c
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/store/award_store.go
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+
+ "github.com/jmoiron/sqlx"
+
+
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+)
+
+const awardColumns = `id, project_id, source_id, parent_award_id, external_id,
total_credits, category, start_date, end_date, status, created_at, updated_at`
+
+type mariaDBAwardStore struct {
+ db *sqlx.DB
+}
+
+func NewAwardStore(db *sqlx.DB) AwardStore {
+ return &mariaDBAwardStore{db: db}
+}
+
+func (s *mariaDBAwardStore) FindByID(ctx context.Context, id string)
(*model.Award, error) {
+ var a model.Award
+ err := s.db.GetContext(ctx, &a,
+ `SELECT `+awardColumns+` FROM awards WHERE id = ?`, id)
+ if err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return nil, nil
+ }
+ return nil, err
+ }
+ return &a, nil
+}
+
+// FindCurrentForProjectAndResource returns the most recently created active
+// Award for (project, resource_type). Used to determine the parent award when
+// processing supplements or renewals.
+func (s *mariaDBAwardStore) FindCurrentForProjectAndResource(ctx
context.Context, projectID, resourceTypeID string) (*model.Award, error) {
+ var a model.Award
+ err := s.db.GetContext(ctx, &a,
+ `SELECT `+awardColumnsWithPrefix("a")+`
+ FROM awards a
+ INNER JOIN allocations al ON al.award_id = a.id
+ WHERE a.project_id = ?
+ AND al.resource_type_id = ?
+ AND a.status = ?
+ ORDER BY a.created_at DESC
+ LIMIT 1`,
+ projectID, resourceTypeID, model.AwardStatusActive)
+ if err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return nil, nil
+ }
+ return nil, err
+ }
+ return &a, nil
+}
+
+func (s *mariaDBAwardStore) Save(ctx context.Context, tx *sql.Tx, a
*model.Award) error {
+ _, err := tx.ExecContext(ctx,
+ `INSERT INTO awards
+ (id, project_id, source_id, parent_award_id, external_id,
total_credits,
+ category, start_date, end_date, status)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+ a.ID, a.ProjectID, a.SourceID, a.ParentAwardID, a.ExternalID,
a.TotalCredits,
+ a.Category, a.StartDate, a.EndDate, a.Status)
+ return err
+}
+
+func awardColumnsWithPrefix(alias string) string {
+ return alias + ".id, " +
+ alias + ".project_id, " +
+ alias + ".source_id, " +
+ alias + ".parent_award_id, " +
+ alias + ".external_id, " +
+ alias + ".total_credits, " +
+ alias + ".category, " +
+ alias + ".start_date, " +
+ alias + ".end_date, " +
+ alias + ".status, " +
+ alias + ".created_at, " +
+ alias + ".updated_at"
+}
diff --git a/connectors/ACCESS/AMIE-Processor/store/credit_transfer_store.go
b/connectors/ACCESS/AMIE-Processor/store/credit_transfer_store.go
new file mode 100644
index 000000000..55b27cdb8
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/store/credit_transfer_store.go
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/jmoiron/sqlx"
+ "github.com/shopspring/decimal"
+
+
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+)
+
+type mariaDBCreditTransferStore struct {
+ db *sqlx.DB
+}
+
+func NewCreditTransferStore(db *sqlx.DB) CreditTransferStore {
+ return &mariaDBCreditTransferStore{db: db}
+}
+
+func (s *mariaDBCreditTransferStore) Save(ctx context.Context, tx *sql.Tx, t
*model.CreditTransfer) error {
+ _, err := tx.ExecContext(ctx,
+ `INSERT INTO credit_transfers
+ (id, allocation_id, transfer_type, amount, balance_after,
+ reference_type, reference_id, description, performed_by_id)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+ t.ID, t.AllocationID, t.TransferType, t.Amount, t.BalanceAfter,
+ t.ReferenceType, t.ReferenceID, t.Description, t.PerformedByID)
+ return err
+}
+
+func (s *mariaDBCreditTransferStore) SumByAllocation(ctx context.Context,
allocationID string) (decimal.Decimal, error) {
+ var sum decimal.Decimal
+ err := s.db.GetContext(ctx, &sum,
+ `SELECT COALESCE(SUM(amount), 0) FROM credit_transfers WHERE
allocation_id = ?`,
+ allocationID)
+ return sum, err
+}
diff --git a/connectors/ACCESS/AMIE-Processor/store/resource_type_store.go
b/connectors/ACCESS/AMIE-Processor/store/resource_type_store.go
new file mode 100644
index 000000000..5aa001675
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/store/resource_type_store.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/jmoiron/sqlx"
+
+
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+)
+
+const resourceTypeColumns = `id, name, display_name, unit, slurm_tres,
is_active, created_at, updated_at`
+
+type mariaDBResourceTypeStore struct {
+ db *sqlx.DB
+}
+
+func NewResourceTypeStore(db *sqlx.DB) ResourceTypeStore {
+ return &mariaDBResourceTypeStore{db: db}
+}
+
+func (s *mariaDBResourceTypeStore) FindByName(ctx context.Context, name
string) (*model.ResourceType, error) {
+ var rt model.ResourceType
+ err := s.db.GetContext(ctx, &rt,
+ `SELECT `+resourceTypeColumns+` FROM resource_types WHERE name
= ? LIMIT 1`, name)
+ if err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return nil, nil
+ }
+ return nil, err
+ }
+ return &rt, nil
+}
+
+func (s *mariaDBResourceTypeStore) EnsureByName(ctx context.Context, tx
*sql.Tx, name string) (*model.ResourceType, error) {
+ if rt, err := s.FindByName(ctx, name); err != nil {
+ return nil, err
+ } else if rt != nil {
+ return rt, nil
+ }
+
+ id := uuid.NewString()
+ now := time.Now().UTC()
+ if _, err := tx.ExecContext(ctx,
+ `INSERT INTO resource_types (id, name, display_name, unit)
VALUES (?, ?, ?, 'hours')
+ ON DUPLICATE KEY UPDATE id = id`,
+ id, name, name); err != nil {
+ return nil, err
+ }
+ return &model.ResourceType{
+ ID: id,
+ Name: name,
+ DisplayName: name,
+ Unit: "hours",
+ IsActive: true,
+ CreatedAt: now,
+ UpdatedAt: now,
+ }, nil
+}
diff --git a/connectors/ACCESS/AMIE-Processor/store/stores.go
b/connectors/ACCESS/AMIE-Processor/store/stores.go
index e02b60274..c8a05124a 100644
--- a/connectors/ACCESS/AMIE-Processor/store/stores.go
+++ b/connectors/ACCESS/AMIE-Processor/store/stores.go
@@ -21,6 +21,8 @@ import (
"context"
"database/sql"
+ "github.com/shopspring/decimal"
+
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
)
@@ -49,3 +51,36 @@ type MembershipStore interface {
Save(ctx context.Context, tx *sql.Tx, m *model.ProjectMembership) error
Update(ctx context.Context, tx *sql.Tx, m *model.ProjectMembership)
error
}
+
+type AllocationSourceStore interface {
+ FindByName(ctx context.Context, name string) (*model.AllocationSource,
error)
+}
+
+type ResourceTypeStore interface {
+ FindByName(ctx context.Context, name string) (*model.ResourceType,
error)
+ // EnsureByName returns the resource_type for `name`, creating one with
+ // default attributes if it doesn't exist. Used by the AMIE handler to
+ // auto-register site-specific resources observed in incoming packets;
+ // site admins can later customize display_name/unit/slurm_tres.
+ EnsureByName(ctx context.Context, tx *sql.Tx, name string)
(*model.ResourceType, error)
+}
+
+type AwardStore interface {
+ FindByID(ctx context.Context, id string) (*model.Award, error)
+ FindCurrentForProjectAndResource(ctx context.Context, projectID,
resourceTypeID string) (*model.Award, error)
+ Save(ctx context.Context, tx *sql.Tx, a *model.Award) error
+}
+
+type AllocationStore interface {
+ FindByID(ctx context.Context, id string) (*model.Allocation, error)
+ FindByAwardAndResource(ctx context.Context, awardID, resourceTypeID
string) (*model.Allocation, error)
+ Save(ctx context.Context, tx *sql.Tx, a *model.Allocation) error
+ UpdateBalance(ctx context.Context, tx *sql.Tx, allocationID string,
newBalance decimal.Decimal) error
+}
+
+// CreditTransferStore is intentionally write-only at the interface level: no
+// Update or Delete methods, mirroring the DB-level append-only triggers.
+type CreditTransferStore interface {
+ Save(ctx context.Context, tx *sql.Tx, t *model.CreditTransfer) error
+ SumByAllocation(ctx context.Context, allocationID string)
(decimal.Decimal, error)
+}
diff --git a/core/domain/model/audit_log.go b/core/domain/model/audit_log.go
index fc6c6b650..99164deb4 100644
--- a/core/domain/model/audit_log.go
+++ b/core/domain/model/audit_log.go
@@ -39,6 +39,9 @@ const (
AuditReactivateMembership AuditAction = "REACTIVATE_MEMBERSHIP"
AuditReplySent AuditAction = "REPLY_SENT"
AuditTransactionComplete AuditAction = "TRANSACTION_COMPLETE"
+
+ AuditGrantAward AuditAction = "GRANT_AWARD"
+ AuditPostCreditTransfer AuditAction = "POST_CREDIT_TRANSFER"
)
// AuditLog is a generic audit record. Connector- or surface-specific context