This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch access-integration in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 2f7de87c2cd737fbaacfd8524633607ebb0b8ce1 Author: lahiruj <[email protected]> AuthorDate: Tue May 19 02:14:25 2026 -0400 Add connector registry pattern with per-connector migrations --- cmd/server/main.go | 19 ++++++++-- internal/db/migrate.go | 30 +++++++++++++++ pkg/connectors/registry.go | 92 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 138 insertions(+), 3 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 978d5cc92..d6d59497b 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -31,6 +31,7 @@ import ( "github.com/apache/airavata-custos/internal/db" "github.com/apache/airavata-custos/internal/server" + "github.com/apache/airavata-custos/pkg/connectors" "github.com/apache/airavata-custos/pkg/events" "github.com/apache/airavata-custos/pkg/service" ) @@ -73,6 +74,21 @@ func run() error { eventBus := events.New() svc := service.New(database, eventBus) + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + for _, c := range connectors.All() { + if migFS, dir := c.Migrations(); migFS != nil { + if err := db.MigrateConnectorFS(database, migFS, dir, c.Name()); err != nil { + return err + } + } + if err := c.Start(ctx, connectors.Deps{DB: database, Service: svc, Bus: eventBus}); err != nil { + return err + } + } + handler := server.LoggingMiddleware(server.New(svc)) httpServer := &http.Server{ @@ -84,9 +100,6 @@ func run() error { IdleTimeout: 120 * time.Second, } - ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) - defer stop() - serverErr := make(chan error, 1) go func() { slog.Info("http server listening", "addr", addr) diff --git a/internal/db/migrate.go b/internal/db/migrate.go index 446c26b9e..b447eab74 100644 --- a/internal/db/migrate.go +++ b/internal/db/migrate.go @@ -20,6 +20,7 @@ package db import ( "errors" "fmt" + "io/fs" "log/slog" "github.com/golang-migrate/migrate/v4" @@ -53,3 +54,32 @@ func MigrateEmbedded(database *sqlx.DB) error { slog.Info("database migrations applied successfully") return nil } + +// MigrateConnectorFS applies migrations from the supplied embed.FS to the +// supplied database, tracking version state in a per-connector table named +// schema_migrations_<name>. Connectors invoke this via the host so version +// sequences never collide across connectors or with core. +func MigrateConnectorFS(database *sqlx.DB, src fs.FS, dir, name string) error { + driver, err := mysql.WithInstance(database.DB, &mysql.Config{ + MigrationsTable: "schema_migrations_" + name, + }) + if err != nil { + return fmt.Errorf("create migration driver for %s: %w", name, err) + } + + source, err := iofs.New(src, dir) + if err != nil { + return fmt.Errorf("create migration source for %s: %w", name, err) + } + + m, err := migrate.NewWithInstance("iofs", source, "mysql", driver) + if err != nil { + return fmt.Errorf("create migrator for %s: %w", name, err) + } + + if err := m.Up(); err != nil && !errors.Is(err, migrate.ErrNoChange) { + return fmt.Errorf("run migrations for %s: %w", name, err) + } + slog.Info("connector migrations applied", "connector", name) + return nil +} diff --git a/pkg/connectors/registry.go b/pkg/connectors/registry.go new file mode 100644 index 000000000..9c807d266 --- /dev/null +++ b/pkg/connectors/registry.go @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package connectors is the registry through which external-system connectors +// (ACCESS AMIE, NAIRR, etc.) plug into the core binary. +// +// A deployment selects which connectors are bundled by importing them +// blank in cmd/server/connectors.go. Each connector's init() function calls +// Register, contributing both its embedded migrations and its Start function +// to the runtime. +package connectors + +import ( + "context" + "io/fs" + "sort" + "sync" + + "github.com/jmoiron/sqlx" + + "github.com/apache/airavata-custos/pkg/events" + "github.com/apache/airavata-custos/pkg/service" +) + +// Deps is the bundle the host hands to every connector at startup. +type Deps struct { + DB *sqlx.DB + Service *service.Service + Bus *events.Bus +} + +// Connector is a long-running subsystem that integrates an external system +// with core. The host applies its migrations first, then calls Start. +type Connector interface { + // Name returns a unique slug for this connector (used to namespace the + // per-connector schema_migrations table). + Name() string + // Migrations returns the embedded migration tree and the subdirectory + // within it that holds the .sql files. Return (nil, "") if the connector + // has no migrations. + Migrations() (fs.FS, string) + // Start kicks off the connector. It must return promptly; long-running + // work belongs in goroutines that respect ctx cancellation. + Start(ctx context.Context, deps Deps) error +} + +var ( + mu sync.Mutex + registry = map[string]Connector{} +) + +// Register adds a connector to the global registry. Typically called from an +// init() function in the connector's package. Re-registration with the same +// Name panics — connector names must be unique. +func Register(c Connector) { + mu.Lock() + defer mu.Unlock() + if _, ok := registry[c.Name()]; ok { + panic("connectors: duplicate registration for " + c.Name()) + } + registry[c.Name()] = c +} + +// All returns the registered connectors in deterministic (name-sorted) order. +func All() []Connector { + mu.Lock() + defer mu.Unlock() + names := make([]string, 0, len(registry)) + for n := range registry { + names = append(names, n) + } + sort.Strings(names) + out := make([]Connector, 0, len(names)) + for _, n := range names { + out = append(out, registry[n]) + } + return out +}
