This is an automated email from the ASF dual-hosted git repository. gaoxingcun pushed a commit to branch reconfiguration_scheduling in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit 8e63c7fff1d147c9715c9ec6559dfa0a6fcf5d96 Author: TJxiaobao <[email protected]> AuthorDate: Mon Sep 8 21:21:18 2025 +0800 add:All protocols are initialized using the init function. --- examples/main_simulation.go | 306 --------------------- internal/collector/basic/init.go | 37 ++- .../common/collect/strategy/strategy_factory.go | 9 + .../common/dispatcher/common_dispatcher.go | 3 +- internal/collector/common/job/job_server.go | 49 ++-- internal/collector/common/types/job/job_types.go | 4 +- 6 files changed, 68 insertions(+), 340 deletions(-) diff --git a/examples/main_simulation.go b/examples/main_simulation.go deleted file mode 100644 index a87665d..0000000 --- a/examples/main_simulation.go +++ /dev/null @@ -1,306 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package main - -//import ( -// "context" -// "flag" -// "fmt" -// "os" -// "os/signal" -// "syscall" -// "time" -// -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector" -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/dispatcher/entrance" -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" -// jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" -//) -// -//// go build -ldflags "-X main.Version=x.y.z" -//var ( -// ConfPath string -// Version string -// Simulation bool // 新增:是否启用模拟模式 -//) -// -//func init() { -// flag.StringVar(&ConfPath, "conf", "hertzbeat-collector.yaml", "path to config file") -// flag.BoolVar(&Simulation, "simulation", true, "enable manager task simulation mode") -//} -// -//func main() { -// flag.Parse() -// -// // 初始化日志 -// log := logger.DefaultLogger(os.Stdout, types.LogLevelInfo) -// log.Info("🚀 启动HertzBeat Collector", "version", Version, "simulation", Simulation) -// Simulation = true -// if Simulation { -// // 模拟模式:启动完整的CollectServer并模拟Manager发送任务 -// runSimulationMode(log) -// } else { -// // 正常模式:启动标准的collector服务 -// if err := collector.Bootstrap(ConfPath, Version); err != nil { -// log.Error(err, "❌ 启动collector失败") -// os.Exit(1) -// } -// } -//} -// -//// runSimulationMode 运行模拟Manager发送任务的模式 -//func runSimulationMode(log logger.Logger) { -// log.Info("🎭 启动模拟模式:模拟Manager发送JDBC采集任务") -// -// // 1. 创建CollectServer (完整的采集器架构) -// config := entrance.DefaultServerConfig() -// collectServer, err := entrance.NewCollectServer(config, log) -// if err != nil { -// log.Error(err, "❌ 创建CollectServer失败") -// return -// } -// -// // 2. 启动CollectServer -// if err := collectServer.Start(); err != nil { -// log.Error(err, "❌ 启动CollectServer失败") -// return -// } -// defer collectServer.Stop() -// -// log.Info("✅ CollectServer已启动,准备接收Manager任务") -// -// // 3. 创建模拟的Manager任务 -// jdbcJob := createManagerJDBCTask() -// -// // 4. 启动任务模拟器 -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// -// // 启动模拟Manager发送任务的goroutine -// go simulateManagerTasks(ctx, collectServer, jdbcJob, log) -// -// // 5. 等待中断信号 -// log.Info("🔄 模拟器运行中 (每60秒模拟Manager发送任务),按Ctrl+C停止...") -// sigCh := make(chan os.Signal, 1) -// signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) -// <-sigCh -// -// log.Info("📴 收到停止信号,正在关闭...") -// cancel() -// time.Sleep(2 * time.Second) -// log.Info("👋 模拟器已停止") -//} -// -//// createManagerJDBCTask 创建模拟从Manager接收的JDBC采集任务 -//func createManagerJDBCTask() *jobtypes.Job { -// return &jobtypes.Job{ -// ID: 1001, -// TenantID: 1, -// MonitorID: 2001, -// App: "mysql", -// Category: "database", -// IsCyclic: true, -// DefaultInterval: 30, // 30秒间隔 -// Timestamp: time.Now().Unix(), -// -// // Manager发送的任务元数据 -// Metadata: map[string]string{ -// "instancename": "生产MySQL实例", -// "instancehost": "localhost:43306", -// "manager_node": "hertzbeat-manager-001", -// "task_source": "manager_scheduler", -// "collector_assigned": "auto", -// }, -// -// Labels: map[string]string{ -// "env": "production", -// "database": "mysql", -// "region": "localhost", -// "managed_by": "hertzbeat", -// "priority": "high", -// }, -// -// Annotations: map[string]string{ -// "description": "MySQL数据库性能监控", -// "created_by": "manager-scheduler", -// "task_type": "cyclic_monitoring", -// "alert_enabled": "true", -// }, -// -// // JDBC采集指标配置 -// Metrics: []jobtypes.Metrics{ -// { -// Name: "mysql_basic_info", -// Priority: 0, -// Protocol: "jdbc", -// Host: "localhost", -// Port: "3306", -// Timeout: "15s", -// Interval: 30, -// Fields: []jobtypes.Field{ -// {Field: "database_name", Type: 1, Label: true}, -// {Field: "version", Type: 1, Label: false}, -// {Field: "uptime_seconds", Type: 0, Label: false}, -// {Field: "server_id", Type: 0, Label: false}, -// }, -// JDBC: &jobtypes.JDBCProtocol{ -// Host: "localhost", -// Port: "3306", -// Platform: "mysql", -// Username: "root", -// Password: "password", // 请修改为实际密码 -// Database: "mysql", -// Timeout: "15", -// QueryType: "oneRow", -// SQL: `SELECT -// DATABASE() as database_name, -// VERSION() as version, -// (SELECT VARIABLE_VALUE FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME='Uptime') as uptime_seconds, -// @@server_id as server_id`, -// }, -// }, -// }, -// } -//} -// -//// simulateManagerTasks 模拟Manager定期发送采集任务 -//func simulateManagerTasks(ctx context.Context, collectServer *entrance.CollectServer, job *jobtypes.Job, log logger.Logger) { -// log.Info("📡 开始模拟Manager任务调度", "jobId", job.ID, "interval", "60s") -// -// // 创建任务响应监听器 (模拟发送结果回Manager) -// eventListener := &ManagerResponseSimulator{ -// logger: log, -// jobID: job.ID, -// monitorID: job.MonitorID, -// } -// -// // 立即发送第一个任务 (模拟Manager初始调度) -// log.Info("📤 Manager发送初始任务", "timestamp", time.Now().Format("15:04:05")) -// if err := collectServer.ReceiveJob(job, eventListener); err != nil { -// log.Error(err, "❌ 接收初始任务失败") -// return -// } -// -// // 模拟Manager定期发送任务更新/重新调度 -// ticker := time.NewTicker(60 * time.Second) // 每60秒模拟一次Manager调度 -// defer ticker.Stop() -// -// taskSequence := 1 -// -// for { -// select { -// case <-ticker.C: -// taskSequence++ -// -// // 创建任务更新 (模拟Manager重新调度或配置更新) -// updatedJob := *job -// updatedJob.Timestamp = time.Now().Unix() -// updatedJob.ID = job.ID + int64(taskSequence) -// -// // 更新任务元数据 (模拟Manager的管理信息) -// updatedJob.Metadata["task_sequence"] = fmt.Sprintf("%d", taskSequence) -// updatedJob.Metadata["last_scheduled"] = time.Now().Format(time.RFC3339) -// updatedJob.Metadata["scheduler_version"] = "v1.0.0" -// -// log.Info("📤 Manager发送任务更新", -// "sequence", taskSequence, -// "jobId", updatedJob.ID, -// "timestamp", time.Now().Format("15:04:05")) -// -// // 发送更新任务到Collector -// if err := collectServer.ReceiveJob(&updatedJob, eventListener); err != nil { -// log.Error(err, "❌ 接收任务更新失败", "sequence", taskSequence) -// } -// -// case <-ctx.Done(): -// log.Info("🛑 停止模拟Manager任务调度") -// return -// } -// } -//} -// -//// ManagerResponseSimulator 模拟Manager接收采集结果的响应处理器 -//type ManagerResponseSimulator struct { -// logger logger.Logger -// jobID int64 -// monitorID int64 -//} -// -//// Response 处理采集结果 (模拟发送给Manager的过程) -//func (mrs *ManagerResponseSimulator) Response(metricsData []interface{}) { -// mrs.logger.Info("📨 Collector采集完成,准备发送结果到Manager", -// "jobId", mrs.jobID, -// "monitorId", mrs.monitorID, -// "metricsCount", len(metricsData)) -// -// // 模拟处理每个采集指标的结果 -// for i, data := range metricsData { -// if collectData, ok := data.(*jobtypes.CollectRepMetricsData); ok { -// mrs.logger.Info("📊 处理采集指标", -// "metricIndex", i+1, -// "metricName", collectData.Metrics, -// "code", collectData.Code, -// "time", collectData.Time) -// -// if collectData.Code == 200 { -// mrs.logger.Info("✅ 指标采集成功", -// "metric", collectData.Metrics, -// "fieldsCount", len(collectData.Fields), -// "valuesCount", len(collectData.Values)) -// -// // 打印采集数据 (模拟发送到Manager的数据) -// for rowIndex, valueRow := range collectData.Values { -// mrs.logger.Info("📈 采集数据", -// "metric", collectData.Metrics, -// "row", rowIndex+1, -// "values", valueRow.Columns) -// } -// } else { -// mrs.logger.Error(fmt.Errorf("采集失败"), "❌ 指标采集错误", -// "metric", collectData.Metrics, -// "code", collectData.Code, -// "message", collectData.Msg) -// } -// } -// } -// -// // 模拟发送HTTP响应到Manager -// mrs.simulateHTTPResponseToManager(metricsData) -//} -// -//// simulateHTTPResponseToManager 模拟通过HTTP将采集结果发送回Manager -//func (mrs *ManagerResponseSimulator) simulateHTTPResponseToManager(metricsData []interface{}) { -// // 模拟HTTP请求参数 -// managerEndpoint := "http://hertzbeat-manager:1157/api/collector/collect" -// collectorIdentity := "collector-go-001" -// -// mrs.logger.Info("🚀 模拟HTTP请求发送采集结果", -// "endpoint", managerEndpoint, -// "collectorId", collectorIdentity, -// "jobId", mrs.jobID, -// "dataCount", len(metricsData), -// "timestamp", time.Now().Format("15:04:05")) -// -// // 这里可以添加真实的HTTP客户端代码 -// // httpClient.Post(managerEndpoint, jsonData) -// -// mrs.logger.Info("📤 采集结果已发送到Manager (模拟成功)", -// "status", "200 OK", -// "responseTime", "15ms") -//} diff --git a/internal/collector/basic/init.go b/internal/collector/basic/init.go index e8456ed..74c4aa6 100644 --- a/internal/collector/basic/init.go +++ b/internal/collector/basic/init.go @@ -25,23 +25,32 @@ import ( "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) +// init 函数在包被导入时自动执行 +// 集中注册所有协议的工厂函数 +func init() { + // 注册所有协议的工厂函数 + // 新增协议时只需要在这里添加一行 + strategy.RegisterFactory("jdbc", func(logger logger.Logger) strategy.Collector { + return database.NewJDBCCollector(logger) + }) + + // 未来可以在这里添加更多协议: + // strategy.RegisterFactory("http", func(logger logger.Logger) strategy.Collector { + // return http.NewHTTPCollector(logger) + // }) + // strategy.RegisterFactory("redis", func(logger logger.Logger) strategy.Collector { + // return redis.NewRedisCollector(logger) + // }) +} + // InitializeAllCollectors 初始化所有已注册的采集器 -// 当logger可用时调用此函数来创建实际的采集器实例 +// 此时init()函数已经注册了所有工厂函数 +// 这个函数创建实际的采集器实例 func InitializeAllCollectors(logger logger.Logger) { logger.Info("initializing all collectors") - // 直接注册所有采集器实例 - // 新增协议时在这里添加 - jdbcCollector := database.NewJDBCCollector(logger) - strategy.Register(jdbcCollector) - - // 未来可以在这里添加更多协议: - // httpCollector := http.NewHTTPCollector(logger) - // strategy.Register(httpCollector) + // 使用已注册的工厂函数创建采集器实例 + strategy.InitializeCollectors(logger) - // 记录初始化结果 - protocols := strategy.SupportedProtocols() - logger.Info("collectors initialized successfully", - "protocols", protocols, - "count", len(protocols)) + logger.Info("all collectors initialized successfully") } diff --git a/internal/collector/common/collect/strategy/strategy_factory.go b/internal/collector/common/collect/strategy/strategy_factory.go index 9fc9af1..fa951b0 100644 --- a/internal/collector/common/collect/strategy/strategy_factory.go +++ b/internal/collector/common/collect/strategy/strategy_factory.go @@ -76,6 +76,15 @@ func InitializeCollectors(logger logger.Logger) { } globalRegistry.initialized = true + + // Log initialization results + protocols := make([]string, 0, len(globalRegistry.collectors)) + for protocol := range globalRegistry.collectors { + protocols = append(protocols, protocol) + } + logger.Info("collectors initialized from factories", + "protocols", protocols, + "count", len(protocols)) } // Register a collector directly (legacy method, kept for compatibility) diff --git a/internal/collector/common/dispatcher/common_dispatcher.go b/internal/collector/common/dispatcher/common_dispatcher.go index 95fb25e..d7a58c0 100644 --- a/internal/collector/common/dispatcher/common_dispatcher.go +++ b/internal/collector/common/dispatcher/common_dispatcher.go @@ -64,7 +64,6 @@ func NewCommonDispatcher(logger logger.Logger, metricsCollector MetricsCollector } // DispatchMetricsTask dispatches a job by breaking it down into individual metrics collection tasks -// This is the core method that corresponds to Java's CommonDispatcher.dispatchMetricsTask func (cd *CommonDispatcherImpl) DispatchMetricsTask(job *jobtypes.Job, timeout *jobtypes.Timeout) error { if job == nil { return fmt.Errorf("job cannot be nil") @@ -79,7 +78,7 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(job *jobtypes.Job, timeout * startTime := time.Now() // Get the next metrics to collect based on job priority and dependencies - metricsToCollect := job.GetNextCollectMetrics() + metricsToCollect := job.NextCollectMetrics() if len(metricsToCollect) == 0 { cd.logger.Info("no metrics to collect", "jobID", job.ID) return nil diff --git a/internal/collector/common/job/job_server.go b/internal/collector/common/job/job_server.go index 35cb195..31eed6f 100644 --- a/internal/collector/common/job/job_server.go +++ b/internal/collector/common/job/job_server.go @@ -24,6 +24,9 @@ import ( "fmt" "sync" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/dispatch" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/dispatcher" clrServer "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" @@ -43,7 +46,6 @@ type Config struct { } // Runner implements the service runner interface -// It directly manages job scheduling through timeDispatch type Runner struct { Config timeDispatch TimeDispatcher @@ -54,7 +56,6 @@ type Runner struct { } // AddAsyncCollectJob adds a job to async collection scheduling -// This is the main entry point that corresponds to Java's collectJobService.addAsyncCollectJob func (r *Runner) AddAsyncCollectJob(job *jobtypes.Job) error { if job == nil { r.Logger.Error(nil, "job cannot be nil") @@ -116,21 +117,31 @@ func (r *Runner) RunningJobs() map[int64]*jobtypes.Job { return result } -// New creates a new job service runner +// New creates a new job service runner with all components initialized func New(srv *Config) *Runner { ctx, cancel := context.WithCancel(context.Background()) - return &Runner{ - Config: *srv, - runningJobs: make(map[int64]*jobtypes.Job), - ctx: ctx, - cancel: cancel, + + // Create result handler + resultHandler := collect.NewResultHandler(srv.Logger) + + // Create metrics collector + metricsCollector := dispatch.NewMetricsCollector(srv.Logger) + + // Create common dispatcher + commonDispatcher := dispatcher.NewCommonDispatcher(srv.Logger, metricsCollector, resultHandler) + + // Create time dispatcher + timeDispatch := dispatcher.NewTimeDispatch(srv.Logger, commonDispatcher) + + runner := &Runner{ + Config: *srv, + timeDispatch: timeDispatch, + runningJobs: make(map[int64]*jobtypes.Job), + ctx: ctx, + cancel: cancel, } -} -// SetTimeDispatcher sets the time dispatcher for this runner -// This should be called before starting the runner -func (r *Runner) SetTimeDispatcher(td TimeDispatcher) { - r.timeDispatch = td + return runner } // Start starts the job service runner @@ -138,12 +149,14 @@ func (r *Runner) Start(ctx context.Context) error { r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", r.Info().Name) r.Logger.Info("Starting job service runner") - // Start the time dispatcher if it's set + // Start the time dispatcher if r.timeDispatch != nil { if err := r.timeDispatch.Start(ctx); err != nil { r.Logger.Error(err, "failed to start time dispatcher") return fmt.Errorf("failed to start time dispatcher: %w", err) } + } else { + return fmt.Errorf("time dispatcher is not initialized") } r.Logger.Info("job service runner started successfully") @@ -167,19 +180,23 @@ func (r *Runner) Info() collector.Info { } } -// Close closes the job service runner +// Close closes the job service runner and all its components func (r *Runner) Close() error { r.Logger.Info("closing job service runner") r.cancel() + // Stop time dispatcher if r.timeDispatch != nil { if err := r.timeDispatch.Stop(); err != nil { r.Logger.Error(err, "error stopping time dispatcher") - return err + // Continue cleanup despite error } } + // Stop common dispatcher (via time dispatcher's commonDispatcher field) + // This is handled internally by the time dispatcher's Stop method + // Clear running jobs r.mu.Lock() r.runningJobs = make(map[int64]*jobtypes.Job) diff --git a/internal/collector/common/types/job/job_types.go b/internal/collector/common/types/job/job_types.go index 9f1df89..28478b8 100644 --- a/internal/collector/common/types/job/job_types.go +++ b/internal/collector/common/types/job/job_types.go @@ -50,10 +50,10 @@ type Job struct { ResponseDataTemp []MetricsData `json:"-"` } -// GetNextCollectMetrics returns the metrics that should be collected next +// NextCollectMetrics returns the metrics that should be collected next // This is a simplified version - in the full implementation this would handle // metric priorities, dependencies, and collection levels -func (j *Job) GetNextCollectMetrics() []*Metrics { +func (j *Job) NextCollectMetrics() []*Metrics { result := make([]*Metrics, 0, len(j.Metrics)) for i := range j.Metrics { result = append(result, &j.Metrics[i]) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
