This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch allocation-management
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git


The following commit(s) were added to refs/heads/allocation-management by this 
push:
     new c703596f0 Implement allocation hierarchy in AMIE handler
c703596f0 is described below

commit c703596f0a2f75ab2847ffb5b8cacfda4f7eac11
Author: lahiruj <[email protected]>
AuthorDate: Tue May 12 21:36:26 2026 -0400

    Implement allocation hierarchy in AMIE handler
---
 .../db/migrations/000001_initial_schema.down.sql   |   8 +
 .../db/migrations/000001_initial_schema.up.sql     | 100 ++++++
 .../dev-seed.sql}                                  |  17 +-
 connectors/ACCESS/AMIE-Processor/go.mod            |   1 +
 connectors/ACCESS/AMIE-Processor/go.sum            |   2 +
 .../handler/request_project_create.go              |  22 ++
 .../handler/request_project_create_test.go         |  40 ++-
 connectors/ACCESS/AMIE-Processor/main.go           |   8 +-
 .../ACCESS/AMIE-Processor/model/allocation.go      |  48 +++
 .../AMIE-Processor/model/allocation_source.go      |  34 +++
 connectors/ACCESS/AMIE-Processor/model/award.go    |  56 ++++
 .../ACCESS/AMIE-Processor/model/credit_transfer.go |  51 ++++
 .../ACCESS/AMIE-Processor/model/resource_type.go   |  39 +++
 .../AMIE-Processor/service/allocation_service.go   | 334 +++++++++++++++++++++
 .../store/allocation_source_store.go               |  51 ++++
 .../AMIE-Processor/store/allocation_store.go       |  87 ++++++
 .../ACCESS/AMIE-Processor/store/award_store.go     | 101 +++++++
 .../AMIE-Processor/store/credit_transfer_store.go  |  55 ++++
 .../AMIE-Processor/store/resource_type_store.go    |  79 +++++
 connectors/ACCESS/AMIE-Processor/store/stores.go   |  35 +++
 core/domain/model/audit_log.go                     |   3 +
 21 files changed, 1157 insertions(+), 14 deletions(-)

diff --git 
a/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.down.sql 
b/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.down.sql
index 88d0ffac7..17828bedf 100644
--- 
a/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.down.sql
+++ 
b/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.down.sql
@@ -15,6 +15,14 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
+DROP TRIGGER IF EXISTS credit_transfers_no_delete;
+DROP TRIGGER IF EXISTS credit_transfers_no_update;
+DROP TABLE IF EXISTS credit_transfers;
+DROP TABLE IF EXISTS allocations;
+DROP TABLE IF EXISTS awards;
+DROP TABLE IF EXISTS resource_types;
+DROP TABLE IF EXISTS allocation_sources;
+
 DROP TABLE IF EXISTS amie_processing_errors;
 DROP TABLE IF EXISTS amie_processing_events;
 DROP TABLE IF EXISTS amie_packets;
diff --git 
a/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.up.sql 
b/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.up.sql
index a63569421..8075bfa10 100644
--- 
a/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.up.sql
+++ 
b/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.up.sql
@@ -115,3 +115,103 @@ CREATE TABLE IF NOT EXISTS amie_processing_errors
     KEY idx_errors_amie_event_id (event_id),
     KEY idx_errors_amie_occurred_at (occurred_at)
 ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
+
+-- ============================================================
+-- Allocation hierarchy: registries, awards, allocations, ledger
+-- ============================================================
+
+CREATE TABLE IF NOT EXISTS allocation_sources
+(
+    id           VARCHAR(255) NOT NULL,
+    name         VARCHAR(64)  NOT NULL,
+    display_name VARCHAR(128) NOT NULL,
+    adapter_type VARCHAR(64)  NOT NULL,
+    is_active    BOOLEAN      NOT NULL DEFAULT TRUE,
+    created_at   TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+    updated_at   TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE 
CURRENT_TIMESTAMP(6),
+    PRIMARY KEY (id),
+    UNIQUE KEY uq_alloc_sources_name (name)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
+
+CREATE TABLE IF NOT EXISTS resource_types
+(
+    id           VARCHAR(255) NOT NULL,
+    name         VARCHAR(64)  NOT NULL,
+    display_name VARCHAR(128) NOT NULL,
+    unit         VARCHAR(32)  NOT NULL,
+    slurm_tres   VARCHAR(128) NULL,
+    is_active    BOOLEAN      NOT NULL DEFAULT TRUE,
+    created_at   TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+    updated_at   TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE 
CURRENT_TIMESTAMP(6),
+    PRIMARY KEY (id),
+    UNIQUE KEY uq_resource_types_name (name)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
+
+CREATE TABLE IF NOT EXISTS awards
+(
+    id              VARCHAR(255)  NOT NULL,
+    project_id      VARCHAR(255)  NOT NULL,
+    source_id       VARCHAR(255)  NOT NULL,
+    parent_award_id VARCHAR(255)  NULL,
+    external_id     VARCHAR(255)  NULL,
+    total_credits   DECIMAL(18,6) NOT NULL,
+    category        VARCHAR(64)   NOT NULL DEFAULT 'new',
+    start_date      DATE          NOT NULL,
+    end_date        DATE          NOT NULL,
+    status          VARCHAR(32)   NOT NULL DEFAULT 'active',
+    created_at      TIMESTAMP(6)  NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+    updated_at      TIMESTAMP(6)  NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON 
UPDATE CURRENT_TIMESTAMP(6),
+    PRIMARY KEY (id),
+    CONSTRAINT fk_awards_project FOREIGN KEY (project_id) REFERENCES projects 
(id),
+    CONSTRAINT fk_awards_source  FOREIGN KEY (source_id) REFERENCES 
allocation_sources (id),
+    CONSTRAINT fk_awards_parent  FOREIGN KEY (parent_award_id) REFERENCES 
awards (id),
+    KEY idx_awards_project (project_id),
+    KEY idx_awards_status (status),
+    KEY idx_awards_parent (parent_award_id)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
+
+CREATE TABLE IF NOT EXISTS allocations
+(
+    id               VARCHAR(255)  NOT NULL,
+    award_id         VARCHAR(255)  NOT NULL,
+    resource_type_id VARCHAR(255)  NOT NULL,
+    source_credits   DECIMAL(18,6) NOT NULL,
+    balance_credits  DECIMAL(18,6) NOT NULL DEFAULT 0,
+    start_date       DATE          NOT NULL,
+    end_date         DATE          NOT NULL,
+    status           VARCHAR(32)   NOT NULL DEFAULT 'active',
+    slurm_account    VARCHAR(255)  NULL,
+    created_at       TIMESTAMP(6)  NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+    updated_at       TIMESTAMP(6)  NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON 
UPDATE CURRENT_TIMESTAMP(6),
+    PRIMARY KEY (id),
+    CONSTRAINT fk_alloc_award    FOREIGN KEY (award_id)         REFERENCES 
awards (id),
+    CONSTRAINT fk_alloc_resource FOREIGN KEY (resource_type_id) REFERENCES 
resource_types (id),
+    UNIQUE KEY uq_alloc_slurm_account (slurm_account),
+    KEY idx_alloc_award (award_id),
+    KEY idx_alloc_status (status)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
+
+CREATE TABLE IF NOT EXISTS credit_transfers
+(
+    id              VARCHAR(255)  NOT NULL,
+    allocation_id   VARCHAR(255)  NOT NULL,
+    transfer_type   VARCHAR(32)   NOT NULL,
+    amount          DECIMAL(18,6) NOT NULL,
+    balance_after   DECIMAL(18,6) NOT NULL,
+    reference_type  VARCHAR(64)   NULL,
+    reference_id    VARCHAR(255)  NULL,
+    description     VARCHAR(512)  NULL,
+    performed_by_id VARCHAR(255)  NULL,
+    created_at      TIMESTAMP(6)  NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
+    PRIMARY KEY (id),
+    CONSTRAINT fk_ct_allocation FOREIGN KEY (allocation_id)   REFERENCES 
allocations (id),
+    CONSTRAINT fk_ct_person     FOREIGN KEY (performed_by_id) REFERENCES 
persons (id),
+    CONSTRAINT chk_ct_type CHECK (transfer_type IN 
('GRANT','USAGE_CHARGE','ADJUSTMENT','EXPIRATION')),
+    KEY idx_ct_allocation (allocation_id),
+    KEY idx_ct_alloc_type (allocation_id, transfer_type),
+    KEY idx_ct_created_at (created_at)
+) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
+
+CREATE TRIGGER credit_transfers_no_update BEFORE UPDATE ON credit_transfers 
FOR EACH ROW SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'credit_transfers is 
append-only; updates are not permitted';
+
+CREATE TRIGGER credit_transfers_no_delete BEFORE DELETE ON credit_transfers 
FOR EACH ROW SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'credit_transfers is 
append-only; deletes are not permitted';
diff --git 
a/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.down.sql 
b/connectors/ACCESS/AMIE-Processor/db/seed/dev-seed.sql
similarity index 57%
copy from 
connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.down.sql
copy to connectors/ACCESS/AMIE-Processor/db/seed/dev-seed.sql
index 88d0ffac7..f59fb301e 100644
--- 
a/connectors/ACCESS/AMIE-Processor/db/migrations/000001_initial_schema.down.sql
+++ b/connectors/ACCESS/AMIE-Processor/db/seed/dev-seed.sql
@@ -14,10 +14,15 @@
 -- KIND, either express or implied.  See the License for the
 -- specific language governing permissions and limitations
 -- under the License.
+--
+-- Dev / Nexus site seed data. Apply after migrations on a fresh database:
+--   mysql -u admin -padmin access_ci < db/seed/dev-seed.sql
+-- Production deployments should provide their own seed reflecting the site's
+-- allocation sources and the GPU / accelerator types actually installed.
+
+INSERT INTO allocation_sources (id, name, display_name, adapter_type) VALUES
+    (UUID(), 'access', 'ACCESS-CI', 'amie');
 
-DROP TABLE IF EXISTS amie_processing_errors;
-DROP TABLE IF EXISTS amie_processing_events;
-DROP TABLE IF EXISTS amie_packets;
-DROP TABLE IF EXISTS project_memberships;
-DROP TABLE IF EXISTS projects;
-DROP TABLE IF EXISTS cluster_accounts;
+INSERT INTO resource_types (id, name, display_name, unit) VALUES
+    (UUID(), 'b200',    'NVIDIA B200 GPU Hours',    'hours'),
+    (UUID(), 'rtx6000', 'NVIDIA RTX 6000 GPU Hours', 'hours');
diff --git a/connectors/ACCESS/AMIE-Processor/go.mod 
b/connectors/ACCESS/AMIE-Processor/go.mod
index 361901b18..5ea9ba61d 100644
--- a/connectors/ACCESS/AMIE-Processor/go.mod
+++ b/connectors/ACCESS/AMIE-Processor/go.mod
@@ -10,6 +10,7 @@ require (
        github.com/jmoiron/sqlx v1.4.0
        github.com/prometheus/client_golang v1.20.5
        github.com/prometheus/client_model v0.6.1
+       github.com/shopspring/decimal v1.4.0
        github.com/stretchr/testify v1.10.0
        google.golang.org/protobuf v1.36.7
        gopkg.in/yaml.v3 v3.0.1
diff --git a/connectors/ACCESS/AMIE-Processor/go.sum 
b/connectors/ACCESS/AMIE-Processor/go.sum
index 605490ad3..df2bb5faa 100644
--- a/connectors/ACCESS/AMIE-Processor/go.sum
+++ b/connectors/ACCESS/AMIE-Processor/go.sum
@@ -83,6 +83,8 @@ github.com/prometheus/procfs v0.15.1 
h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg
 github.com/prometheus/procfs v0.15.1/go.mod 
h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
 github.com/rogpeppe/go-internal v1.10.0 
h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
 github.com/rogpeppe/go-internal v1.10.0/go.mod 
h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
+github.com/shopspring/decimal v1.4.0 
h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
+github.com/shopspring/decimal v1.4.0/go.mod 
h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
 github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
 github.com/stretchr/objx v0.5.2/go.mod 
h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
 github.com/stretchr/testify v1.10.0 
h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go 
b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
index 4e04d6659..cd9783744 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
@@ -23,6 +23,7 @@ import (
        "fmt"
 
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+       
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/service"
        coremodel "github.com/apache/airavata-custos/core/domain/model"
 )
 
@@ -42,6 +43,10 @@ type requestProjectCreateMembershipService interface {
        CreateMembership(ctx context.Context, tx *sql.Tx, projectID, 
clusterAccountID, role string) (*model.ProjectMembership, error)
 }
 
+type requestProjectCreateAllocationService interface {
+       CreateFromProjectPacket(ctx context.Context, tx *sql.Tx, project 
*model.Project, body map[string]any, packetID string) 
(*service.AllocationResult, error)
+}
+
 type requestProjectCreateAmieClient interface {
        ReplyToPacket(ctx context.Context, packetRecID int64, reply 
map[string]any) error
 }
@@ -55,6 +60,7 @@ type RequestProjectCreateHandler struct {
        accountSvc    requestProjectCreateAccountService
        projectSvc    requestProjectCreateProjectService
        membershipSvc requestProjectCreateMembershipService
+       allocationSvc requestProjectCreateAllocationService
        amieClient    requestProjectCreateAmieClient
        auditSvc      requestProjectCreateAuditService
 }
@@ -64,6 +70,7 @@ func NewRequestProjectCreateHandler(
        accountSvc requestProjectCreateAccountService,
        projectSvc requestProjectCreateProjectService,
        membershipSvc requestProjectCreateMembershipService,
+       allocationSvc requestProjectCreateAllocationService,
        amieClient requestProjectCreateAmieClient,
        auditSvc requestProjectCreateAuditService,
 ) *RequestProjectCreateHandler {
@@ -72,6 +79,7 @@ func NewRequestProjectCreateHandler(
                accountSvc:    accountSvc,
                projectSvc:    projectSvc,
                membershipSvc: membershipSvc,
+               allocationSvc: allocationSvc,
                amieClient:    amieClient,
                auditSvc:      auditSvc,
        }
@@ -153,6 +161,20 @@ func (h *RequestProjectCreateHandler) Handle(ctx 
context.Context, tx *sql.Tx, pa
                return fmt.Errorf("request_project_create: audit 
CREATE_MEMBERSHIP: %w", err)
        }
 
+       allocResult, err := h.allocationSvc.CreateFromProjectPacket(ctx, tx, 
project, body, packet.ID)
+       if err != nil {
+               return fmt.Errorf("request_project_create: creating allocation 
hierarchy: %w", err)
+       }
+       if allocResult.Transfer != nil {
+               summary := fmt.Sprintf("award %s (%s): %s credits", 
allocResult.Award.ID, allocResult.Category, 
allocResult.Award.TotalCredits.String())
+               if err := h.auditSvc.Log(ctx, tx, packet.ID, eventID, 
coremodel.AuditGrantAward, "award", allocResult.Award.ID, summary); err != nil {
+                       return fmt.Errorf("request_project_create: audit 
GRANT_AWARD: %w", err)
+               }
+               if err := h.auditSvc.Log(ctx, tx, packet.ID, eventID, 
coremodel.AuditPostCreditTransfer, "credit_transfer", allocResult.Transfer.ID, 
""); err != nil {
+                       return fmt.Errorf("request_project_create: audit 
POST_CREDIT_TRANSFER: %w", err)
+               }
+       }
+
        // Build and send the reply.
        replyBody := map[string]any{
                "ProjectID":         project.ID,
diff --git 
a/connectors/ACCESS/AMIE-Processor/handler/request_project_create_test.go 
b/connectors/ACCESS/AMIE-Processor/handler/request_project_create_test.go
index b5e2c969b..2d616ec5a 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_project_create_test.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_project_create_test.go
@@ -22,11 +22,14 @@ import (
        "database/sql"
        "testing"
 
-       
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
-       coremodel "github.com/apache/airavata-custos/core/domain/model"
+       "github.com/shopspring/decimal"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/mock"
        "github.com/stretchr/testify/require"
+
+       
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+       
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/service"
+       coremodel "github.com/apache/airavata-custos/core/domain/model"
 )
 
 // ---------------------------------------------------------------------------
@@ -85,6 +88,16 @@ func (m *mockRPCAuditService) Log(ctx context.Context, tx 
*sql.Tx, packetID, eve
        return m.Called(ctx, tx, packetID, eventID, action, entityType, 
entityID, summary).Error(0)
 }
 
+type mockRPCAllocationService struct{ mock.Mock }
+
+func (m *mockRPCAllocationService) CreateFromProjectPacket(ctx 
context.Context, tx *sql.Tx, project *model.Project, body map[string]any, 
packetID string) (*service.AllocationResult, error) {
+       args := m.Called(ctx, tx, project, body, packetID)
+       if args.Get(0) == nil {
+               return nil, args.Error(1)
+       }
+       return args.Get(0).(*service.AllocationResult), args.Error(1)
+}
+
 // ---------------------------------------------------------------------------
 // Helpers
 // ---------------------------------------------------------------------------
@@ -94,10 +107,11 @@ func newRPCHandler(
        as *mockRPCAccountService,
        prj *mockRPCProjectService,
        ms *mockRPCMembershipService,
+       alloc *mockRPCAllocationService,
        ac *mockRPCAmieClient,
        aud *mockRPCAuditService,
 ) *RequestProjectCreateHandler {
-       return NewRequestProjectCreateHandler(ps, as, prj, ms, ac, aud)
+       return NewRequestProjectCreateHandler(ps, as, prj, ms, alloc, ac, aud)
 }
 
 // ---------------------------------------------------------------------------
@@ -108,6 +122,7 @@ func TestRequestProjectCreateHandler_SupportsType(t 
*testing.T) {
        h := newRPCHandler(
                &mockRPCPersonService{}, &mockRPCAccountService{},
                &mockRPCProjectService{}, &mockRPCMembershipService{},
+               &mockRPCAllocationService{},
                &mockRPCAmieClient{}, &mockRPCAuditService{},
        )
        assert.Equal(t, "request_project_create", h.SupportsType())
@@ -119,16 +134,22 @@ func TestRequestProjectCreateHandler(t *testing.T) {
        tests := []struct {
                name       string
                input      map[string]any
-               setupMocks func(ps *mockRPCPersonService, as 
*mockRPCAccountService, prj *mockRPCProjectService, ms 
*mockRPCMembershipService, ac *mockRPCAmieClient, aud *mockRPCAuditService)
+               setupMocks func(ps *mockRPCPersonService, as 
*mockRPCAccountService, prj *mockRPCProjectService, ms 
*mockRPCMembershipService, alloc *mockRPCAllocationService, ac 
*mockRPCAmieClient, aud *mockRPCAuditService)
                wantErr    string
        }{
                {
                        name:  "valid packet processes successfully",
                        input: validFixture,
-                       setupMocks: func(ps *mockRPCPersonService, as 
*mockRPCAccountService, prj *mockRPCProjectService, ms 
*mockRPCMembershipService, ac *mockRPCAmieClient, aud *mockRPCAuditService) {
+                       setupMocks: func(ps *mockRPCPersonService, as 
*mockRPCAccountService, prj *mockRPCProjectService, ms 
*mockRPCMembershipService, alloc *mockRPCAllocationService, ac 
*mockRPCAmieClient, aud *mockRPCAuditService) {
                                person := &coremodel.Person{ID: "person-123"}
                                account := &model.ClusterAccount{ID: 
"account-123", Username: "hwan"}
                                project := &model.Project{ID: "project-123", 
GrantNumber: "NNT259276"}
+                               allocResult := &service.AllocationResult{
+                                       Award:      &model.Award{ID: 
"award-123", TotalCredits: decimal.NewFromInt(1), Category: 
model.AwardCategoryNew},
+                                       Allocation: &model.Allocation{ID: 
"alloc-123"},
+                                       Transfer:   &model.CreditTransfer{ID: 
"transfer-123"},
+                                       Category:   model.AwardCategoryNew,
+                               }
 
                                ps.On("FindOrCreateFromPacket", mock.Anything, 
mock.Anything, mock.Anything).Return(person, nil)
                                aud.On("Log", mock.Anything, mock.Anything, 
mock.Anything, mock.Anything, coremodel.AuditCreatePerson, mock.Anything, 
mock.Anything, mock.Anything).Return(nil)
@@ -142,6 +163,10 @@ func TestRequestProjectCreateHandler(t *testing.T) {
                                ms.On("CreateMembership", mock.Anything, 
mock.Anything, project.ID, account.ID, "PI").Return(&model.ProjectMembership{}, 
nil)
                                aud.On("Log", mock.Anything, mock.Anything, 
mock.Anything, mock.Anything, coremodel.AuditCreateMembership, mock.Anything, 
mock.Anything, mock.Anything).Return(nil)
 
+                               alloc.On("CreateFromProjectPacket", 
mock.Anything, mock.Anything, project, mock.Anything, 
mock.Anything).Return(allocResult, nil)
+                               aud.On("Log", mock.Anything, mock.Anything, 
mock.Anything, mock.Anything, coremodel.AuditGrantAward, mock.Anything, 
mock.Anything, mock.Anything).Return(nil)
+                               aud.On("Log", mock.Anything, mock.Anything, 
mock.Anything, mock.Anything, coremodel.AuditPostCreditTransfer, mock.Anything, 
mock.Anything, mock.Anything).Return(nil)
+
                                ac.On("ReplyToPacket", mock.Anything, 
int64(233497907), mock.Anything).Return(nil)
                                aud.On("Log", mock.Anything, mock.Anything, 
mock.Anything, mock.Anything, coremodel.AuditReplySent, mock.Anything, 
mock.Anything, mock.Anything).Return(nil)
                        },
@@ -195,14 +220,15 @@ func TestRequestProjectCreateHandler(t *testing.T) {
                        as := &mockRPCAccountService{}
                        prj := &mockRPCProjectService{}
                        ms := &mockRPCMembershipService{}
+                       alloc := &mockRPCAllocationService{}
                        ac := &mockRPCAmieClient{}
                        aud := &mockRPCAuditService{}
 
                        if tt.setupMocks != nil {
-                               tt.setupMocks(ps, as, prj, ms, ac, aud)
+                               tt.setupMocks(ps, as, prj, ms, alloc, ac, aud)
                        }
 
-                       h := newRPCHandler(ps, as, prj, ms, ac, aud)
+                       h := newRPCHandler(ps, as, prj, ms, alloc, ac, aud)
                        packet := &model.Packet{ID: "test-id", AmieID: 
233497907, Type: "request_project_create"}
 
                        err := h.Handle(context.Background(), nil, tt.input, 
packet, "event-1")
diff --git a/connectors/ACCESS/AMIE-Processor/main.go 
b/connectors/ACCESS/AMIE-Processor/main.go
index 3538f908f..2d8d37bb2 100644
--- a/connectors/ACCESS/AMIE-Processor/main.go
+++ b/connectors/ACCESS/AMIE-Processor/main.go
@@ -76,19 +76,25 @@ func main() {
        packetStore := store.NewPacketStore(database)
        eventStore := store.NewEventStore(database)
        errorStore := store.NewProcessingErrorStore(database)
+       allocationSourceStore := store.NewAllocationSourceStore(database)
+       resourceTypeStore := store.NewResourceTypeStore(database)
+       awardStore := store.NewAwardStore(database)
+       allocationStore := store.NewAllocationStore(database)
+       creditTransferStore := store.NewCreditTransferStore(database)
 
        personSvc := service.NewPersonService(personStore, personDNStore, 
accountStore, externalIdentityStore)
        accountSvc := service.NewUserAccountService(accountStore)
        projectSvc := service.NewProjectService(projectStore)
        membershipSvc := service.NewProjectMembershipService(membershipStore, 
projectStore, accountStore)
        auditSvc := service.NewAuditService(auditLogStore)
+       allocationSvc := service.NewAllocationService(allocationSourceStore, 
resourceTypeStore, awardStore, allocationStore, creditTransferStore)
 
        amie := amieclient.New(cfg.AMIE)
 
        met := metrics.New()
 
        router := handler.NewRouter(
-               handler.NewRequestProjectCreateHandler(personSvc, accountSvc, 
projectSvc, membershipSvc, amie, auditSvc),
+               handler.NewRequestProjectCreateHandler(personSvc, accountSvc, 
projectSvc, membershipSvc, allocationSvc, amie, auditSvc),
                handler.NewRequestAccountCreateHandler(personSvc, accountSvc, 
projectSvc, membershipSvc, amie, auditSvc),
                handler.NewRequestProjectInactivateHandler(projectSvc, 
membershipSvc, amie, auditSvc),
                handler.NewRequestProjectReactivateHandler(projectSvc, 
membershipSvc, amie, auditSvc),
diff --git a/connectors/ACCESS/AMIE-Processor/model/allocation.go 
b/connectors/ACCESS/AMIE-Processor/model/allocation.go
new file mode 100644
index 000000000..84da69ccd
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/model/allocation.go
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package model
+
+import (
+       "time"
+
+       "github.com/shopspring/decimal"
+)
+
+// Allocation is the resource-specific draw from an Award. BalanceCredits is 
the
+// running balance maintained by the credit ledger (sum of all CreditTransfers
+// for this allocation).
+type Allocation struct {
+       ID             string          `db:"id" json:"id"`
+       AwardID        string          `db:"award_id" json:"award_id"`
+       ResourceTypeID string          `db:"resource_type_id" 
json:"resource_type_id"`
+       SourceCredits  decimal.Decimal `db:"source_credits" 
json:"source_credits"`
+       BalanceCredits decimal.Decimal `db:"balance_credits" 
json:"balance_credits"`
+       StartDate      time.Time       `db:"start_date" json:"start_date"`
+       EndDate        time.Time       `db:"end_date" json:"end_date"`
+       Status         string          `db:"status" json:"status"`
+       SlurmAccount   *string         `db:"slurm_account" 
json:"slurm_account,omitempty"`
+       CreatedAt      time.Time       `db:"created_at" json:"created_at"`
+       UpdatedAt      time.Time       `db:"updated_at" json:"updated_at"`
+}
+
+const (
+       AllocationStatusActive    = "active"
+       AllocationStatusExpired   = "expired"
+       AllocationStatusSuspended = "suspended"
+       AllocationStatusDepleted  = "depleted"
+)
diff --git a/connectors/ACCESS/AMIE-Processor/model/allocation_source.go 
b/connectors/ACCESS/AMIE-Processor/model/allocation_source.go
new file mode 100644
index 000000000..1c60581a5
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/model/allocation_source.go
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package model
+
+import "time"
+
+// AllocationSource is an upstream authority that grants allocations.
+// e.g., ('access', 'ACCESS-CI', 'amie').
+type AllocationSource struct {
+       ID          string    `db:"id" json:"id"`
+       Name        string    `db:"name" json:"name"`
+       DisplayName string    `db:"display_name" json:"display_name"`
+       AdapterType string    `db:"adapter_type" json:"adapter_type"`
+       IsActive    bool      `db:"is_active" json:"is_active"`
+       CreatedAt   time.Time `db:"created_at" json:"created_at"`
+       UpdatedAt   time.Time `db:"updated_at" json:"updated_at"`
+}
+
+const SourceNameAccess = "access"
diff --git a/connectors/ACCESS/AMIE-Processor/model/award.go 
b/connectors/ACCESS/AMIE-Processor/model/award.go
new file mode 100644
index 000000000..624287dff
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/model/award.go
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package model
+
+import (
+       "time"
+
+       "github.com/shopspring/decimal"
+)
+
+// Award is the approved credit pool granted to a Project from an
+// AllocationSource. Supplements/renewals create a new Award row tied to the
+// previous one via ParentAwardID.
+type Award struct {
+       ID            string          `db:"id" json:"id"`
+       ProjectID     string          `db:"project_id" json:"project_id"`
+       SourceID      string          `db:"source_id" json:"source_id"`
+       ParentAwardID *string         `db:"parent_award_id" 
json:"parent_award_id,omitempty"`
+       ExternalID    *string         `db:"external_id" 
json:"external_id,omitempty"`
+       TotalCredits  decimal.Decimal `db:"total_credits" json:"total_credits"`
+       Category      string          `db:"category" json:"category"`
+       StartDate     time.Time       `db:"start_date" json:"start_date"`
+       EndDate       time.Time       `db:"end_date" json:"end_date"`
+       Status        string          `db:"status" json:"status"`
+       CreatedAt     time.Time       `db:"created_at" json:"created_at"`
+       UpdatedAt     time.Time       `db:"updated_at" json:"updated_at"`
+}
+
+const (
+       AwardCategoryNew        = "new"
+       AwardCategorySupplement = "supplement"
+       AwardCategoryRenewal    = "renewal"
+)
+
+const (
+       AwardStatusPending   = "pending"
+       AwardStatusActive    = "active"
+       AwardStatusExpired   = "expired"
+       AwardStatusSuspended = "suspended"
+       AwardStatusCancelled = "cancelled"
+)
diff --git a/connectors/ACCESS/AMIE-Processor/model/credit_transfer.go 
b/connectors/ACCESS/AMIE-Processor/model/credit_transfer.go
new file mode 100644
index 000000000..6862541ee
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/model/credit_transfer.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package model
+
+import (
+       "time"
+
+       "github.com/shopspring/decimal"
+)
+
+// CreditTransfer is an immutable ledger event for a credit movement against an
+// Allocation. Append-only at the DB level (BEFORE UPDATE / BEFORE DELETE
+// triggers reject mutations).
+type CreditTransfer struct {
+       ID            string          `db:"id" json:"id"`
+       AllocationID  string          `db:"allocation_id" json:"allocation_id"`
+       TransferType  string          `db:"transfer_type" json:"transfer_type"`
+       Amount        decimal.Decimal `db:"amount" json:"amount"`
+       BalanceAfter  decimal.Decimal `db:"balance_after" json:"balance_after"`
+       ReferenceType *string         `db:"reference_type" 
json:"reference_type,omitempty"`
+       ReferenceID   *string         `db:"reference_id" 
json:"reference_id,omitempty"`
+       Description   *string         `db:"description" 
json:"description,omitempty"`
+       PerformedByID *string         `db:"performed_by_id" 
json:"performed_by_id,omitempty"`
+       CreatedAt     time.Time       `db:"created_at" json:"created_at"`
+}
+
+const (
+       TransferTypeGrant       = "GRANT"
+       TransferTypeUsageCharge = "USAGE_CHARGE"
+       TransferTypeAdjustment  = "ADJUSTMENT"
+       TransferTypeExpiration  = "EXPIRATION"
+)
+
+const (
+       TransferReferenceAMIEPacket = "amie_packet"
+)
diff --git a/connectors/ACCESS/AMIE-Processor/model/resource_type.go 
b/connectors/ACCESS/AMIE-Processor/model/resource_type.go
new file mode 100644
index 000000000..bb196d05e
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/model/resource_type.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package model
+
+import "time"
+
+// ResourceType maps a logical resource (CPU, GPU, storage) to its unit and the
+// SLURM TRES name used for accounting.
+type ResourceType struct {
+       ID          string    `db:"id" json:"id"`
+       Name        string    `db:"name" json:"name"`
+       DisplayName string    `db:"display_name" json:"display_name"`
+       Unit        string    `db:"unit" json:"unit"`
+       SlurmTRES   *string   `db:"slurm_tres" json:"slurm_tres,omitempty"`
+       IsActive    bool      `db:"is_active" json:"is_active"`
+       CreatedAt   time.Time `db:"created_at" json:"created_at"`
+       UpdatedAt   time.Time `db:"updated_at" json:"updated_at"`
+}
+
+const (
+       ResourceTypeNameCPU     = "cpu"
+       ResourceTypeNameGPU     = "gpu"
+       ResourceTypeNameStorage = "storage"
+)
diff --git a/connectors/ACCESS/AMIE-Processor/service/allocation_service.go 
b/connectors/ACCESS/AMIE-Processor/service/allocation_service.go
new file mode 100644
index 000000000..1900d9edd
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/service/allocation_service.go
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package service
+
+import (
+       "context"
+       "database/sql"
+       "fmt"
+       "log/slog"
+       "strings"
+       "time"
+
+       "github.com/google/uuid"
+       "github.com/shopspring/decimal"
+
+       
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+)
+
+type allocationSourceStore interface {
+       FindByName(ctx context.Context, name string) (*model.AllocationSource, 
error)
+}
+
+type resourceTypeStore interface {
+       EnsureByName(ctx context.Context, tx *sql.Tx, name string) 
(*model.ResourceType, error)
+}
+
+type awardStore interface {
+       FindCurrentForProjectAndResource(ctx context.Context, projectID, 
resourceTypeID string) (*model.Award, error)
+       Save(ctx context.Context, tx *sql.Tx, a *model.Award) error
+}
+
+type allocationStore interface {
+       FindByAwardAndResource(ctx context.Context, awardID, resourceTypeID 
string) (*model.Allocation, error)
+       Save(ctx context.Context, tx *sql.Tx, a *model.Allocation) error
+       UpdateBalance(ctx context.Context, tx *sql.Tx, allocationID string, 
newBalance decimal.Decimal) error
+}
+
+type creditTransferStore interface {
+       Save(ctx context.Context, tx *sql.Tx, t *model.CreditTransfer) error
+}
+
+// AllocationResult is what the allocation service returns to the caller (the
+// AMIE handler). All three fields are populated for "new", and Allocation is
+// reused (rather than newly created) for supplements.
+type AllocationResult struct {
+       Award      *model.Award
+       Allocation *model.Allocation
+       Transfer   *model.CreditTransfer
+       Category   string // resolved category: "new", "supplement", "renewal"
+}
+
+type AllocationService struct {
+       sources       allocationSourceStore
+       resourceTypes resourceTypeStore
+       awards        awardStore
+       allocations   allocationStore
+       transfers     creditTransferStore
+}
+
+func NewAllocationService(
+       sources allocationSourceStore,
+       resourceTypes resourceTypeStore,
+       awards awardStore,
+       allocations allocationStore,
+       transfers creditTransferStore,
+) *AllocationService {
+       return &AllocationService{
+               sources:       sources,
+               resourceTypes: resourceTypes,
+               awards:        awards,
+               allocations:   allocations,
+               transfers:     transfers,
+       }
+}
+
+// CreateFromProjectPacket branches on AllocationType:
+//   - "new" (or empty/unknown): new Award + new Allocation + GRANT
+//   - "supplement": new Award (chained) + top up existing Allocation + GRANT
+//   - "renewal": new Award (chained) + new Allocation + GRANT
+//
+// If AllocationType resolves to "new" but the project already has
+// an active Award for this resource, the call is a no-op and the returned
+// AllocationResult has nil Transfer so the caller can audit it as a duplicate.
+func (s *AllocationService) CreateFromProjectPacket(
+       ctx context.Context,
+       tx *sql.Tx,
+       project *model.Project,
+       body map[string]any,
+       packetID string,
+) (*AllocationResult, error) {
+       serviceUnits, ok := parseDecimal(body["ServiceUnitsAllocated"])
+       if !ok {
+               return nil, fmt.Errorf("allocation_service: 
ServiceUnitsAllocated is required")
+       }
+       if serviceUnits.LessThanOrEqual(decimal.Zero) {
+               return nil, fmt.Errorf("allocation_service: 
ServiceUnitsAllocated must be positive, got %s", serviceUnits)
+       }
+
+       startDate, err := parseDate(body["StartDate"])
+       if err != nil {
+               return nil, fmt.Errorf("allocation_service: parsing StartDate: 
%w", err)
+       }
+       endDate, err := parseDate(body["EndDate"])
+       if err != nil {
+               return nil, fmt.Errorf("allocation_service: parsing EndDate: 
%w", err)
+       }
+
+       resourceName := firstResource(body)
+       if resourceName == "" {
+               return nil, fmt.Errorf("allocation_service: ResourceList must 
contain at least one resource")
+       }
+       resourceType, err := s.resourceTypes.EnsureByName(ctx, tx, resourceName)
+       if err != nil {
+               return nil, fmt.Errorf("allocation_service: ensuring resource 
type %s: %w", resourceName, err)
+       }
+
+       source, err := s.sources.FindByName(ctx, model.SourceNameAccess)
+       if err != nil {
+               return nil, fmt.Errorf("allocation_service: looking up source 
%s: %w", model.SourceNameAccess, err)
+       }
+       if source == nil {
+               return nil, fmt.Errorf("allocation_service: source %s not 
configured", model.SourceNameAccess)
+       }
+
+       category := normalizeAllocationType(getString(body, "AllocationType"))
+
+       existing, err := s.awards.FindCurrentForProjectAndResource(ctx, 
project.ID, resourceType.ID)
+       if err != nil {
+               return nil, fmt.Errorf("allocation_service: finding existing 
award: %w", err)
+       }
+
+       switch category {
+       case model.AwardCategorySupplement, model.AwardCategoryRenewal:
+               if existing == nil {
+                       slog.WarnContext(ctx, "allocation_service: no existing 
award found for supplement/renewal; treating as new",
+                               "project_id", project.ID,
+                               "resource", resourceName,
+                               "category", category)
+                       category = model.AwardCategoryNew
+               }
+       default:
+               if existing != nil {
+                       slog.WarnContext(ctx, "allocation_service: project 
already has an active award for this resource; skipping duplicate 'new'",
+                               "project_id", project.ID,
+                               "resource", resourceName,
+                               "existing_award_id", existing.ID,
+                               "packet_id", packetID)
+                       return &AllocationResult{Award: existing, Category: 
category}, nil
+               }
+       }
+
+       award := &model.Award{
+               ID:           uuid.NewString(),
+               ProjectID:    project.ID,
+               SourceID:     source.ID,
+               TotalCredits: serviceUnits,
+               Category:     category,
+               StartDate:    startDate,
+               EndDate:      endDate,
+               Status:       model.AwardStatusActive,
+       }
+       if existing != nil {
+               parentID := existing.ID
+               award.ParentAwardID = &parentID
+       }
+       if err := s.awards.Save(ctx, tx, award); err != nil {
+               return nil, fmt.Errorf("allocation_service: saving award: %w", 
err)
+       }
+
+       var (
+               allocation       *model.Allocation
+               newBalance       decimal.Decimal
+               transferTargetID = ""
+       )
+
+       if category == model.AwardCategorySupplement {
+               existingAlloc, err := s.allocations.FindByAwardAndResource(ctx, 
existing.ID, resourceType.ID)
+               if err != nil {
+                       return nil, fmt.Errorf("allocation_service: finding 
existing allocation: %w", err)
+               }
+               if existingAlloc == nil {
+                       return nil, fmt.Errorf("allocation_service: existing 
award %s has no allocation for resource %s", existing.ID, resourceName)
+               }
+               newBalance = existingAlloc.BalanceCredits.Add(serviceUnits)
+               if err := s.allocations.UpdateBalance(ctx, tx, 
existingAlloc.ID, newBalance); err != nil {
+                       return nil, fmt.Errorf("allocation_service: updating 
allocation balance: %w", err)
+               }
+               existingAlloc.BalanceCredits = newBalance
+               allocation = existingAlloc
+               transferTargetID = existingAlloc.ID
+       } else {
+               allocation = &model.Allocation{
+                       ID:             uuid.NewString(),
+                       AwardID:        award.ID,
+                       ResourceTypeID: resourceType.ID,
+                       SourceCredits:  serviceUnits,
+                       BalanceCredits: serviceUnits,
+                       StartDate:      startDate,
+                       EndDate:        endDate,
+                       Status:         model.AllocationStatusActive,
+               }
+               if err := s.allocations.Save(ctx, tx, allocation); err != nil {
+                       return nil, fmt.Errorf("allocation_service: saving 
allocation: %w", err)
+               }
+               newBalance = serviceUnits
+               transferTargetID = allocation.ID
+       }
+
+       refType := model.TransferReferenceAMIEPacket
+       refID := packetID
+       description := fmt.Sprintf("GRANT from %s award (%s)", source.Name, 
category)
+       transfer := &model.CreditTransfer{
+               ID:            uuid.NewString(),
+               AllocationID:  transferTargetID,
+               TransferType:  model.TransferTypeGrant,
+               Amount:        serviceUnits,
+               BalanceAfter:  newBalance,
+               ReferenceType: &refType,
+               ReferenceID:   &refID,
+               Description:   &description,
+               CreatedAt:     time.Now().UTC(),
+       }
+       if err := s.transfers.Save(ctx, tx, transfer); err != nil {
+               return nil, fmt.Errorf("allocation_service: saving credit 
transfer: %w", err)
+       }
+
+       return &AllocationResult{
+               Award:      award,
+               Allocation: allocation,
+               Transfer:   transfer,
+               Category:   category,
+       }, nil
+}
+
+// normalizeAllocationType maps any case variant of the AMIE
+// AllocationType field to one of our internal category constants. Unknown
+// values default to "new" so the system fails forward rather than rejecting
+// unfamiliar packets.
+func normalizeAllocationType(raw string) string {
+       v := strings.ToLower(strings.TrimSpace(raw))
+       v = strings.ReplaceAll(v, " ", "")
+       v = strings.ReplaceAll(v, "_", "")
+       v = strings.ReplaceAll(v, "-", "")
+       switch v {
+       case "supplement", "supplemental":
+               return model.AwardCategorySupplement
+       case "renewal", "renew":
+               return model.AwardCategoryRenewal
+       case "new", "":
+               return model.AwardCategoryNew
+       default:
+               return model.AwardCategoryNew
+       }
+}
+
+// firstResource accepts ResourceList as either []any (AMIE-conformant) or
+// string (mock-server simplification). Returns the lowercased trimmed name.
+func firstResource(body map[string]any) string {
+       switch v := body["ResourceList"].(type) {
+       case []any:
+               if len(v) == 0 {
+                       return ""
+               }
+               s, _ := v[0].(string)
+               return strings.ToLower(strings.TrimSpace(s))
+       case string:
+               return strings.ToLower(strings.TrimSpace(v))
+       default:
+               return ""
+       }
+}
+
+func parseDecimal(v any) (decimal.Decimal, bool) {
+       switch x := v.(type) {
+       case nil:
+               return decimal.Zero, false
+       case float64:
+               return decimal.NewFromFloat(x), true
+       case float32:
+               return decimal.NewFromFloat(float64(x)), true
+       case int:
+               return decimal.NewFromInt(int64(x)), true
+       case int64:
+               return decimal.NewFromInt(x), true
+       case string:
+               d, err := decimal.NewFromString(x)
+               if err != nil {
+                       return decimal.Zero, false
+               }
+               return d, true
+       default:
+               return decimal.Zero, false
+       }
+}
+
+// parseDate accepts dates as "YYYY-MM-DD", RFC3339, or time.Time directly.
+func parseDate(v any) (time.Time, error) {
+       switch x := v.(type) {
+       case nil:
+               return time.Time{}, fmt.Errorf("date is required")
+       case time.Time:
+               return x, nil
+       case string:
+               if t, err := time.Parse("2006-01-02", x); err == nil {
+                       return t, nil
+               }
+               if t, err := time.Parse(time.RFC3339, x); err == nil {
+                       return t, nil
+               }
+               return time.Time{}, fmt.Errorf("unrecognized date format: %s", 
x)
+       default:
+               return time.Time{}, fmt.Errorf("unsupported date type: %T", v)
+       }
+}
+
+func getString(body map[string]any, key string) string {
+       v, _ := body[key].(string)
+       return v
+}
diff --git a/connectors/ACCESS/AMIE-Processor/store/allocation_source_store.go 
b/connectors/ACCESS/AMIE-Processor/store/allocation_source_store.go
new file mode 100644
index 000000000..418ee9dce
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/store/allocation_source_store.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+
+       "github.com/jmoiron/sqlx"
+
+       
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+)
+
+const allocationSourceColumns = `id, name, display_name, adapter_type, 
is_active, created_at, updated_at`
+
+type mariaDBAllocationSourceStore struct {
+       db *sqlx.DB
+}
+
+func NewAllocationSourceStore(db *sqlx.DB) AllocationSourceStore {
+       return &mariaDBAllocationSourceStore{db: db}
+}
+
+func (s *mariaDBAllocationSourceStore) FindByName(ctx context.Context, name 
string) (*model.AllocationSource, error) {
+       var src model.AllocationSource
+       err := s.db.GetContext(ctx, &src,
+               `SELECT `+allocationSourceColumns+` FROM allocation_sources 
WHERE name = ? LIMIT 1`, name)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &src, nil
+}
diff --git a/connectors/ACCESS/AMIE-Processor/store/allocation_store.go 
b/connectors/ACCESS/AMIE-Processor/store/allocation_store.go
new file mode 100644
index 000000000..c32a95bd1
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/store/allocation_store.go
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+
+       "github.com/jmoiron/sqlx"
+       "github.com/shopspring/decimal"
+
+       
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+)
+
+const allocationColumns = `id, award_id, resource_type_id, source_credits, 
balance_credits, start_date, end_date, status, slurm_account, created_at, 
updated_at`
+
+type mariaDBAllocationStore struct {
+       db *sqlx.DB
+}
+
+func NewAllocationStore(db *sqlx.DB) AllocationStore {
+       return &mariaDBAllocationStore{db: db}
+}
+
+func (s *mariaDBAllocationStore) FindByID(ctx context.Context, id string) 
(*model.Allocation, error) {
+       var a model.Allocation
+       err := s.db.GetContext(ctx, &a,
+               `SELECT `+allocationColumns+` FROM allocations WHERE id = ?`, 
id)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &a, nil
+}
+
+func (s *mariaDBAllocationStore) FindByAwardAndResource(ctx context.Context, 
awardID, resourceTypeID string) (*model.Allocation, error) {
+       var a model.Allocation
+       err := s.db.GetContext(ctx, &a,
+               `SELECT `+allocationColumns+`
+                FROM allocations
+                WHERE award_id = ? AND resource_type_id = ?
+                LIMIT 1`,
+               awardID, resourceTypeID)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &a, nil
+}
+
+func (s *mariaDBAllocationStore) Save(ctx context.Context, tx *sql.Tx, a 
*model.Allocation) error {
+       _, err := tx.ExecContext(ctx,
+               `INSERT INTO allocations
+                  (id, award_id, resource_type_id, source_credits, 
balance_credits,
+                   start_date, end_date, status, slurm_account)
+                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+               a.ID, a.AwardID, a.ResourceTypeID, a.SourceCredits, 
a.BalanceCredits,
+               a.StartDate, a.EndDate, a.Status, a.SlurmAccount)
+       return err
+}
+
+func (s *mariaDBAllocationStore) UpdateBalance(ctx context.Context, tx 
*sql.Tx, allocationID string, newBalance decimal.Decimal) error {
+       _, err := tx.ExecContext(ctx,
+               `UPDATE allocations SET balance_credits = ? WHERE id = ?`,
+               newBalance, allocationID)
+       return err
+}
diff --git a/connectors/ACCESS/AMIE-Processor/store/award_store.go 
b/connectors/ACCESS/AMIE-Processor/store/award_store.go
new file mode 100644
index 000000000..56b1e5b3c
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/store/award_store.go
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+
+       "github.com/jmoiron/sqlx"
+
+       
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+)
+
+const awardColumns = `id, project_id, source_id, parent_award_id, external_id, 
total_credits, category, start_date, end_date, status, created_at, updated_at`
+
+type mariaDBAwardStore struct {
+       db *sqlx.DB
+}
+
+func NewAwardStore(db *sqlx.DB) AwardStore {
+       return &mariaDBAwardStore{db: db}
+}
+
+func (s *mariaDBAwardStore) FindByID(ctx context.Context, id string) 
(*model.Award, error) {
+       var a model.Award
+       err := s.db.GetContext(ctx, &a,
+               `SELECT `+awardColumns+` FROM awards WHERE id = ?`, id)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &a, nil
+}
+
+// FindCurrentForProjectAndResource returns the most recently created active
+// Award for (project, resource_type). Used to determine the parent award when
+// processing supplements or renewals.
+func (s *mariaDBAwardStore) FindCurrentForProjectAndResource(ctx 
context.Context, projectID, resourceTypeID string) (*model.Award, error) {
+       var a model.Award
+       err := s.db.GetContext(ctx, &a,
+               `SELECT `+awardColumnsWithPrefix("a")+`
+                FROM awards a
+                INNER JOIN allocations al ON al.award_id = a.id
+                WHERE a.project_id = ?
+                  AND al.resource_type_id = ?
+                  AND a.status = ?
+                ORDER BY a.created_at DESC
+                LIMIT 1`,
+               projectID, resourceTypeID, model.AwardStatusActive)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &a, nil
+}
+
+func (s *mariaDBAwardStore) Save(ctx context.Context, tx *sql.Tx, a 
*model.Award) error {
+       _, err := tx.ExecContext(ctx,
+               `INSERT INTO awards
+                  (id, project_id, source_id, parent_award_id, external_id, 
total_credits,
+                   category, start_date, end_date, status)
+                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+               a.ID, a.ProjectID, a.SourceID, a.ParentAwardID, a.ExternalID, 
a.TotalCredits,
+               a.Category, a.StartDate, a.EndDate, a.Status)
+       return err
+}
+
+func awardColumnsWithPrefix(alias string) string {
+       return alias + ".id, " +
+               alias + ".project_id, " +
+               alias + ".source_id, " +
+               alias + ".parent_award_id, " +
+               alias + ".external_id, " +
+               alias + ".total_credits, " +
+               alias + ".category, " +
+               alias + ".start_date, " +
+               alias + ".end_date, " +
+               alias + ".status, " +
+               alias + ".created_at, " +
+               alias + ".updated_at"
+}
diff --git a/connectors/ACCESS/AMIE-Processor/store/credit_transfer_store.go 
b/connectors/ACCESS/AMIE-Processor/store/credit_transfer_store.go
new file mode 100644
index 000000000..55b27cdb8
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/store/credit_transfer_store.go
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+       "context"
+       "database/sql"
+
+       "github.com/jmoiron/sqlx"
+       "github.com/shopspring/decimal"
+
+       
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+)
+
+type mariaDBCreditTransferStore struct {
+       db *sqlx.DB
+}
+
+func NewCreditTransferStore(db *sqlx.DB) CreditTransferStore {
+       return &mariaDBCreditTransferStore{db: db}
+}
+
+func (s *mariaDBCreditTransferStore) Save(ctx context.Context, tx *sql.Tx, t 
*model.CreditTransfer) error {
+       _, err := tx.ExecContext(ctx,
+               `INSERT INTO credit_transfers
+                  (id, allocation_id, transfer_type, amount, balance_after,
+                   reference_type, reference_id, description, performed_by_id)
+                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
+               t.ID, t.AllocationID, t.TransferType, t.Amount, t.BalanceAfter,
+               t.ReferenceType, t.ReferenceID, t.Description, t.PerformedByID)
+       return err
+}
+
+func (s *mariaDBCreditTransferStore) SumByAllocation(ctx context.Context, 
allocationID string) (decimal.Decimal, error) {
+       var sum decimal.Decimal
+       err := s.db.GetContext(ctx, &sum,
+               `SELECT COALESCE(SUM(amount), 0) FROM credit_transfers WHERE 
allocation_id = ?`,
+               allocationID)
+       return sum, err
+}
diff --git a/connectors/ACCESS/AMIE-Processor/store/resource_type_store.go 
b/connectors/ACCESS/AMIE-Processor/store/resource_type_store.go
new file mode 100644
index 000000000..5aa001675
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/store/resource_type_store.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+       "time"
+
+       "github.com/google/uuid"
+       "github.com/jmoiron/sqlx"
+
+       
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+)
+
+const resourceTypeColumns = `id, name, display_name, unit, slurm_tres, 
is_active, created_at, updated_at`
+
+type mariaDBResourceTypeStore struct {
+       db *sqlx.DB
+}
+
+func NewResourceTypeStore(db *sqlx.DB) ResourceTypeStore {
+       return &mariaDBResourceTypeStore{db: db}
+}
+
+func (s *mariaDBResourceTypeStore) FindByName(ctx context.Context, name 
string) (*model.ResourceType, error) {
+       var rt model.ResourceType
+       err := s.db.GetContext(ctx, &rt,
+               `SELECT `+resourceTypeColumns+` FROM resource_types WHERE name 
= ? LIMIT 1`, name)
+       if err != nil {
+               if errors.Is(err, sql.ErrNoRows) {
+                       return nil, nil
+               }
+               return nil, err
+       }
+       return &rt, nil
+}
+
+func (s *mariaDBResourceTypeStore) EnsureByName(ctx context.Context, tx 
*sql.Tx, name string) (*model.ResourceType, error) {
+       if rt, err := s.FindByName(ctx, name); err != nil {
+               return nil, err
+       } else if rt != nil {
+               return rt, nil
+       }
+
+       id := uuid.NewString()
+       now := time.Now().UTC()
+       if _, err := tx.ExecContext(ctx,
+               `INSERT INTO resource_types (id, name, display_name, unit) 
VALUES (?, ?, ?, 'hours')
+                ON DUPLICATE KEY UPDATE id = id`,
+               id, name, name); err != nil {
+               return nil, err
+       }
+       return &model.ResourceType{
+               ID:          id,
+               Name:        name,
+               DisplayName: name,
+               Unit:        "hours",
+               IsActive:    true,
+               CreatedAt:   now,
+               UpdatedAt:   now,
+       }, nil
+}
diff --git a/connectors/ACCESS/AMIE-Processor/store/stores.go 
b/connectors/ACCESS/AMIE-Processor/store/stores.go
index e02b60274..c8a05124a 100644
--- a/connectors/ACCESS/AMIE-Processor/store/stores.go
+++ b/connectors/ACCESS/AMIE-Processor/store/stores.go
@@ -21,6 +21,8 @@ import (
        "context"
        "database/sql"
 
+       "github.com/shopspring/decimal"
+
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
 )
 
@@ -49,3 +51,36 @@ type MembershipStore interface {
        Save(ctx context.Context, tx *sql.Tx, m *model.ProjectMembership) error
        Update(ctx context.Context, tx *sql.Tx, m *model.ProjectMembership) 
error
 }
+
+type AllocationSourceStore interface {
+       FindByName(ctx context.Context, name string) (*model.AllocationSource, 
error)
+}
+
+type ResourceTypeStore interface {
+       FindByName(ctx context.Context, name string) (*model.ResourceType, 
error)
+       // EnsureByName returns the resource_type for `name`, creating one with
+       // default attributes if it doesn't exist. Used by the AMIE handler to
+       // auto-register site-specific resources observed in incoming packets;
+       // site admins can later customize display_name/unit/slurm_tres.
+       EnsureByName(ctx context.Context, tx *sql.Tx, name string) 
(*model.ResourceType, error)
+}
+
+type AwardStore interface {
+       FindByID(ctx context.Context, id string) (*model.Award, error)
+       FindCurrentForProjectAndResource(ctx context.Context, projectID, 
resourceTypeID string) (*model.Award, error)
+       Save(ctx context.Context, tx *sql.Tx, a *model.Award) error
+}
+
+type AllocationStore interface {
+       FindByID(ctx context.Context, id string) (*model.Allocation, error)
+       FindByAwardAndResource(ctx context.Context, awardID, resourceTypeID 
string) (*model.Allocation, error)
+       Save(ctx context.Context, tx *sql.Tx, a *model.Allocation) error
+       UpdateBalance(ctx context.Context, tx *sql.Tx, allocationID string, 
newBalance decimal.Decimal) error
+}
+
+// CreditTransferStore is intentionally write-only at the interface level: no
+// Update or Delete methods, mirroring the DB-level append-only triggers.
+type CreditTransferStore interface {
+       Save(ctx context.Context, tx *sql.Tx, t *model.CreditTransfer) error
+       SumByAllocation(ctx context.Context, allocationID string) 
(decimal.Decimal, error)
+}
diff --git a/core/domain/model/audit_log.go b/core/domain/model/audit_log.go
index fc6c6b650..99164deb4 100644
--- a/core/domain/model/audit_log.go
+++ b/core/domain/model/audit_log.go
@@ -39,6 +39,9 @@ const (
        AuditReactivateMembership AuditAction = "REACTIVATE_MEMBERSHIP"
        AuditReplySent            AuditAction = "REPLY_SENT"
        AuditTransactionComplete  AuditAction = "TRANSACTION_COMPLETE"
+
+       AuditGrantAward         AuditAction = "GRANT_AWARD"
+       AuditPostCreditTransfer AuditAction = "POST_CREDIT_TRANSFER"
 )
 
 // AuditLog is a generic audit record. Connector- or surface-specific context

Reply via email to