This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch access-integration-v3 in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 8b4a0b478f0bafc321453cbb37d8a4b7aff5e7e4 Author: lahiruj <[email protected]> AuthorDate: Thu May 21 03:37:21 2026 -0400 Wait for connector goroutines to drain on shutdown --- cmd/server/main.go | 20 +++++++++++++++++++- connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go | 20 ++++++++++---------- .../SLURM/Association-Mapper/pkg/smapper/loader.go | 3 ++- internal/connectors/loader.go | 7 ++++--- 4 files changed, 35 insertions(+), 15 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 6c3a96037..14a954b30 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -26,6 +26,7 @@ import ( "os" "os/signal" "strconv" + "sync" "syscall" "time" @@ -79,7 +80,10 @@ func run() error { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() - if err := connectors.LoadConnectors(ctx, database, eventBus, svc); err != nil { + // Tracks every background goroutine spawned by connectors so we can wait + // for them to drain on shutdown instead of killing them mid-flight. + var connectorsWG sync.WaitGroup + if err := connectors.LoadConnectors(ctx, database, eventBus, svc, &connectorsWG); err != nil { return err } @@ -117,6 +121,20 @@ func run() error { if err := httpServer.Shutdown(shutdownCtx); err != nil { return err } + + slog.Info("waiting for connectors to drain") + connectorsDone := make(chan struct{}) + go func() { + connectorsWG.Wait() + close(connectorsDone) + }() + select { + case <-connectorsDone: + slog.Info("connectors drained cleanly") + case <-time.After(30 * time.Second): + slog.Warn("connector drain timed out; some workers may have leaked") + } + slog.Info("server stopped cleanly") return nil } diff --git a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go index e6dc0d5d5..7dcc52c7d 100644 --- a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go +++ b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go @@ -43,10 +43,9 @@ import ( const connectorName = "amie" -// LoadConnector applies the connector's migrations and starts its background -// workers. It returns nil and skips when AMIE_BASE_URL / AMIE_SITE_CODE / +// LoadConnector skips silently when AMIE_BASE_URL / AMIE_SITE_CODE / // AMIE_API_KEY are not all set. -func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus *events.Bus, coreService *coreservice.Service) error { +func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus *events.Bus, coreService *coreservice.Service, wg *sync.WaitGroup) error { cfg := loadConfig() if cfg.AMIE.APIKey == "" || cfg.AMIE.BaseURL == "" || cfg.AMIE.SiteCode == "" { slog.Warn("AMIE credentials not fully provided, skipping AMIE connector") @@ -92,15 +91,16 @@ func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus *events.Bus, poller := worker.NewPoller(amie, packetStore, eventStore, met, database, cfg.AMIE) processor := worker.NewProcessor(eventStore, packetStore, errorStore, router, met, database, cfg.AMIE) - var wg sync.WaitGroup wg.Add(2) - go func() { defer wg.Done(); poller.Run(ctx) }() - go func() { defer wg.Done(); processor.Run(ctx) }() - go func() { - <-ctx.Done() - wg.Wait() - slog.Info("AMIE connector stopped") + defer wg.Done() + poller.Run(ctx) + slog.Info("AMIE poller stopped") + }() + go func() { + defer wg.Done() + processor.Run(ctx) + slog.Info("AMIE processor stopped") }() slog.Info("AMIE connector started", "site", cfg.AMIE.SiteCode, "baseURL", cfg.AMIE.BaseURL) diff --git a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go index 7efd4b29a..9eee2ae74 100644 --- a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go +++ b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go @@ -4,6 +4,7 @@ import ( "context" "log/slog" "os" + "sync" "github.com/jmoiron/sqlx" @@ -13,7 +14,7 @@ import ( "github.com/apache/airavata-custos/pkg/service" ) -func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, coreService *service.Service) error { +func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, coreService *service.Service, _ *sync.WaitGroup) error { // Read url, username, and password from environment variables apiUrl := os.Getenv("SLURM_API") diff --git a/internal/connectors/loader.go b/internal/connectors/loader.go index 0632fd7eb..99fb221c0 100644 --- a/internal/connectors/loader.go +++ b/internal/connectors/loader.go @@ -20,6 +20,7 @@ package connectors import ( "context" "log/slog" + "sync" "github.com/jmoiron/sqlx" @@ -29,17 +30,17 @@ import ( "github.com/apache/airavata-custos/pkg/service" ) -func LoadConnectors(ctx context.Context, database *sqlx.DB, eventBus *events.Bus, coreService *service.Service) error { +func LoadConnectors(ctx context.Context, database *sqlx.DB, eventBus *events.Bus, coreService *service.Service, wg *sync.WaitGroup) error { slog.Info("loading connectors") slog.Info("loading SLURM Association Mapper connector") - if err := smapper.LoadConnector(ctx, database, eventBus, coreService); err != nil { + if err := smapper.LoadConnector(ctx, database, eventBus, coreService, wg); err != nil { slog.Error("failed to load SLURM Association Mapper connector", "error", err) return err } slog.Info("loading AMIE connector") - if err := amie.LoadConnector(ctx, database, eventBus, coreService); err != nil { + if err := amie.LoadConnector(ctx, database, eventBus, coreService, wg); err != nil { slog.Error("failed to load AMIE connector", "error", err) return err }
