This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch access-integration-v3 in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 2666a188d8ccb84a4c96c68ac000965c8657278f Author: lahiruj <[email protected]> AuthorDate: Wed May 20 18:20:11 2026 -0400 Add MigrateConnectorFS for per-connector schema isolation --- cmd/server/main.go | 2 ++ internal/db/migrate.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/cmd/server/main.go b/cmd/server/main.go index e2cc07006..34d12bbd9 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -66,6 +66,8 @@ func run() error { } defer database.Close() + // Core schema must run before connector migrations because connector + // schemas may FK into core tables. if err := db.MigrateEmbedded(database); err != nil { return err } diff --git a/internal/db/migrate.go b/internal/db/migrate.go index 446c26b9e..f8a53cfea 100644 --- a/internal/db/migrate.go +++ b/internal/db/migrate.go @@ -20,7 +20,9 @@ package db import ( "errors" "fmt" + "io/fs" "log/slog" + "regexp" "github.com/golang-migrate/migrate/v4" "github.com/golang-migrate/migrate/v4/database/mysql" @@ -53,3 +55,56 @@ func MigrateEmbedded(database *sqlx.DB) error { slog.Info("database migrations applied successfully") return nil } + +// connectorNamePattern restricts connector names to a safe identifier shape so +// the value can be concatenated into a SQL table name (schema_migrations_<name>) +// without escaping concerns. +var connectorNamePattern = regexp.MustCompile(`^[a-z][a-z0-9_]{0,30}$`) + +// MigrateConnectorFS applies pending migrations from src against the database, +// tracking version state in schema_migrations_<name> so each connector +// versions its schema independently of core and of other connectors. +func MigrateConnectorFS(database *sqlx.DB, src fs.FS, dir, name string) error { + if database == nil { + return errors.New("database is required") + } + if src == nil { + return errors.New("migration source is required") + } + if dir == "" { + return errors.New("migration directory is required") + } + if !connectorNamePattern.MatchString(name) { + return fmt.Errorf("connector name %q is invalid; must match %s", name, connectorNamePattern) + } + + driver, err := mysql.WithInstance(database.DB, &mysql.Config{ + MigrationsTable: "schema_migrations_" + name, + }) + if err != nil { + return fmt.Errorf("create migration driver for %s: %w", name, err) + } + + source, err := iofs.New(src, dir) + if err != nil { + return fmt.Errorf("create migration source for %s: %w", name, err) + } + + m, err := migrate.NewWithInstance("iofs", source, "mysql", driver) + if err != nil { + return fmt.Errorf("create migrator for %s: %w", name, err) + } + + before, _, _ := m.Version() + upErr := m.Up() + after, _, _ := m.Version() + if upErr != nil && !errors.Is(upErr, migrate.ErrNoChange) { + return fmt.Errorf("run migrations for %s: %w", name, upErr) + } + if errors.Is(upErr, migrate.ErrNoChange) { + slog.Info("connector migrations already up to date", "connector", name, "version", after) + return nil + } + slog.Info("connector migrations applied", "connector", name, "from", before, "to", after) + return nil +}
