This is an automated email from the ASF dual-hosted git repository.

gaoxingcun pushed a commit to branch reconfiguration_scheduling
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git

commit 8e63c7fff1d147c9715c9ec6559dfa0a6fcf5d96
Author: TJxiaobao <[email protected]>
AuthorDate: Mon Sep 8 21:21:18 2025 +0800

    add:All protocols are initialized using the init function.
---
 examples/main_simulation.go                        | 306 ---------------------
 internal/collector/basic/init.go                   |  37 ++-
 .../common/collect/strategy/strategy_factory.go    |   9 +
 .../common/dispatcher/common_dispatcher.go         |   3 +-
 internal/collector/common/job/job_server.go        |  49 ++--
 internal/collector/common/types/job/job_types.go   |   4 +-
 6 files changed, 68 insertions(+), 340 deletions(-)

diff --git a/examples/main_simulation.go b/examples/main_simulation.go
deleted file mode 100644
index a87665d..0000000
--- a/examples/main_simulation.go
+++ /dev/null
@@ -1,306 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package main
-
-//import (
-//     "context"
-//     "flag"
-//     "fmt"
-//     "os"
-//     "os/signal"
-//     "syscall"
-//     "time"
-//
-//     "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector"
-//     
"hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/dispatcher/entrance"
-//     "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger"
-//     "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types"
-//     jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job"
-//)
-//
-//// go build -ldflags "-X main.Version=x.y.z"
-//var (
-//     ConfPath   string
-//     Version    string
-//     Simulation bool // 新增:是否启用模拟模式
-//)
-//
-//func init() {
-//     flag.StringVar(&ConfPath, "conf", "hertzbeat-collector.yaml", "path to 
config file")
-//     flag.BoolVar(&Simulation, "simulation", true, "enable manager task 
simulation mode")
-//}
-//
-//func main() {
-//     flag.Parse()
-//
-//     // 初始化日志
-//     log := logger.DefaultLogger(os.Stdout, types.LogLevelInfo)
-//     log.Info("🚀 启动HertzBeat Collector", "version", Version, "simulation", 
Simulation)
-//     Simulation = true
-//     if Simulation {
-//             // 模拟模式:启动完整的CollectServer并模拟Manager发送任务
-//             runSimulationMode(log)
-//     } else {
-//             // 正常模式:启动标准的collector服务
-//             if err := collector.Bootstrap(ConfPath, Version); err != nil {
-//                     log.Error(err, "❌ 启动collector失败")
-//                     os.Exit(1)
-//             }
-//     }
-//}
-//
-//// runSimulationMode 运行模拟Manager发送任务的模式
-//func runSimulationMode(log logger.Logger) {
-//     log.Info("🎭 启动模拟模式:模拟Manager发送JDBC采集任务")
-//
-//     // 1. 创建CollectServer (完整的采集器架构)
-//     config := entrance.DefaultServerConfig()
-//     collectServer, err := entrance.NewCollectServer(config, log)
-//     if err != nil {
-//             log.Error(err, "❌ 创建CollectServer失败")
-//             return
-//     }
-//
-//     // 2. 启动CollectServer
-//     if err := collectServer.Start(); err != nil {
-//             log.Error(err, "❌ 启动CollectServer失败")
-//             return
-//     }
-//     defer collectServer.Stop()
-//
-//     log.Info("✅ CollectServer已启动,准备接收Manager任务")
-//
-//     // 3. 创建模拟的Manager任务
-//     jdbcJob := createManagerJDBCTask()
-//
-//     // 4. 启动任务模拟器
-//     ctx, cancel := context.WithCancel(context.Background())
-//     defer cancel()
-//
-//     // 启动模拟Manager发送任务的goroutine
-//     go simulateManagerTasks(ctx, collectServer, jdbcJob, log)
-//
-//     // 5. 等待中断信号
-//     log.Info("🔄 模拟器运行中 (每60秒模拟Manager发送任务),按Ctrl+C停止...")
-//     sigCh := make(chan os.Signal, 1)
-//     signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
-//     <-sigCh
-//
-//     log.Info("📴 收到停止信号,正在关闭...")
-//     cancel()
-//     time.Sleep(2 * time.Second)
-//     log.Info("👋 模拟器已停止")
-//}
-//
-//// createManagerJDBCTask 创建模拟从Manager接收的JDBC采集任务
-//func createManagerJDBCTask() *jobtypes.Job {
-//     return &jobtypes.Job{
-//             ID:              1001,
-//             TenantID:        1,
-//             MonitorID:       2001,
-//             App:             "mysql",
-//             Category:        "database",
-//             IsCyclic:        true,
-//             DefaultInterval: 30, // 30秒间隔
-//             Timestamp:       time.Now().Unix(),
-//
-//             // Manager发送的任务元数据
-//             Metadata: map[string]string{
-//                     "instancename":       "生产MySQL实例",
-//                     "instancehost":       "localhost:43306",
-//                     "manager_node":       "hertzbeat-manager-001",
-//                     "task_source":        "manager_scheduler",
-//                     "collector_assigned": "auto",
-//             },
-//
-//             Labels: map[string]string{
-//                     "env":        "production",
-//                     "database":   "mysql",
-//                     "region":     "localhost",
-//                     "managed_by": "hertzbeat",
-//                     "priority":   "high",
-//             },
-//
-//             Annotations: map[string]string{
-//                     "description":   "MySQL数据库性能监控",
-//                     "created_by":    "manager-scheduler",
-//                     "task_type":     "cyclic_monitoring",
-//                     "alert_enabled": "true",
-//             },
-//
-//             // JDBC采集指标配置
-//             Metrics: []jobtypes.Metrics{
-//                     {
-//                             Name:     "mysql_basic_info",
-//                             Priority: 0,
-//                             Protocol: "jdbc",
-//                             Host:     "localhost",
-//                             Port:     "3306",
-//                             Timeout:  "15s",
-//                             Interval: 30,
-//                             Fields: []jobtypes.Field{
-//                                     {Field: "database_name", Type: 1, 
Label: true},
-//                                     {Field: "version", Type: 1, Label: 
false},
-//                                     {Field: "uptime_seconds", Type: 0, 
Label: false},
-//                                     {Field: "server_id", Type: 0, Label: 
false},
-//                             },
-//                             JDBC: &jobtypes.JDBCProtocol{
-//                                     Host:      "localhost",
-//                                     Port:      "3306",
-//                                     Platform:  "mysql",
-//                                     Username:  "root",
-//                                     Password:  "password", // 请修改为实际密码
-//                                     Database:  "mysql",
-//                                     Timeout:   "15",
-//                                     QueryType: "oneRow",
-//                                     SQL: `SELECT
-//                                             DATABASE() as database_name,
-//                                             VERSION() as version,
-//                                             (SELECT VARIABLE_VALUE FROM 
INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME='Uptime') as 
uptime_seconds,
-//                                             @@server_id as server_id`,
-//                             },
-//                     },
-//             },
-//     }
-//}
-//
-//// simulateManagerTasks 模拟Manager定期发送采集任务
-//func simulateManagerTasks(ctx context.Context, collectServer 
*entrance.CollectServer, job *jobtypes.Job, log logger.Logger) {
-//     log.Info("📡 开始模拟Manager任务调度", "jobId", job.ID, "interval", "60s")
-//
-//     // 创建任务响应监听器 (模拟发送结果回Manager)
-//     eventListener := &ManagerResponseSimulator{
-//             logger:    log,
-//             jobID:     job.ID,
-//             monitorID: job.MonitorID,
-//     }
-//
-//     // 立即发送第一个任务 (模拟Manager初始调度)
-//     log.Info("📤 Manager发送初始任务", "timestamp", time.Now().Format("15:04:05"))
-//     if err := collectServer.ReceiveJob(job, eventListener); err != nil {
-//             log.Error(err, "❌ 接收初始任务失败")
-//             return
-//     }
-//
-//     // 模拟Manager定期发送任务更新/重新调度
-//     ticker := time.NewTicker(60 * time.Second) // 每60秒模拟一次Manager调度
-//     defer ticker.Stop()
-//
-//     taskSequence := 1
-//
-//     for {
-//             select {
-//             case <-ticker.C:
-//                     taskSequence++
-//
-//                     // 创建任务更新 (模拟Manager重新调度或配置更新)
-//                     updatedJob := *job
-//                     updatedJob.Timestamp = time.Now().Unix()
-//                     updatedJob.ID = job.ID + int64(taskSequence)
-//
-//                     // 更新任务元数据 (模拟Manager的管理信息)
-//                     updatedJob.Metadata["task_sequence"] = 
fmt.Sprintf("%d", taskSequence)
-//                     updatedJob.Metadata["last_scheduled"] = 
time.Now().Format(time.RFC3339)
-//                     updatedJob.Metadata["scheduler_version"] = "v1.0.0"
-//
-//                     log.Info("📤 Manager发送任务更新",
-//                             "sequence", taskSequence,
-//                             "jobId", updatedJob.ID,
-//                             "timestamp", time.Now().Format("15:04:05"))
-//
-//                     // 发送更新任务到Collector
-//                     if err := collectServer.ReceiveJob(&updatedJob, 
eventListener); err != nil {
-//                             log.Error(err, "❌ 接收任务更新失败", "sequence", 
taskSequence)
-//                     }
-//
-//             case <-ctx.Done():
-//                     log.Info("🛑 停止模拟Manager任务调度")
-//                     return
-//             }
-//     }
-//}
-//
-//// ManagerResponseSimulator 模拟Manager接收采集结果的响应处理器
-//type ManagerResponseSimulator struct {
-//     logger    logger.Logger
-//     jobID     int64
-//     monitorID int64
-//}
-//
-//// Response 处理采集结果 (模拟发送给Manager的过程)
-//func (mrs *ManagerResponseSimulator) Response(metricsData []interface{}) {
-//     mrs.logger.Info("📨 Collector采集完成,准备发送结果到Manager",
-//             "jobId", mrs.jobID,
-//             "monitorId", mrs.monitorID,
-//             "metricsCount", len(metricsData))
-//
-//     // 模拟处理每个采集指标的结果
-//     for i, data := range metricsData {
-//             if collectData, ok := data.(*jobtypes.CollectRepMetricsData); 
ok {
-//                     mrs.logger.Info("📊 处理采集指标",
-//                             "metricIndex", i+1,
-//                             "metricName", collectData.Metrics,
-//                             "code", collectData.Code,
-//                             "time", collectData.Time)
-//
-//                     if collectData.Code == 200 {
-//                             mrs.logger.Info("✅ 指标采集成功",
-//                                     "metric", collectData.Metrics,
-//                                     "fieldsCount", len(collectData.Fields),
-//                                     "valuesCount", len(collectData.Values))
-//
-//                             // 打印采集数据 (模拟发送到Manager的数据)
-//                             for rowIndex, valueRow := range 
collectData.Values {
-//                                     mrs.logger.Info("📈 采集数据",
-//                                             "metric", collectData.Metrics,
-//                                             "row", rowIndex+1,
-//                                             "values", valueRow.Columns)
-//                             }
-//                     } else {
-//                             mrs.logger.Error(fmt.Errorf("采集失败"), "❌ 指标采集错误",
-//                                     "metric", collectData.Metrics,
-//                                     "code", collectData.Code,
-//                                     "message", collectData.Msg)
-//                     }
-//             }
-//     }
-//
-//     // 模拟发送HTTP响应到Manager
-//     mrs.simulateHTTPResponseToManager(metricsData)
-//}
-//
-//// simulateHTTPResponseToManager 模拟通过HTTP将采集结果发送回Manager
-//func (mrs *ManagerResponseSimulator) 
simulateHTTPResponseToManager(metricsData []interface{}) {
-//     // 模拟HTTP请求参数
-//     managerEndpoint := "http://hertzbeat-manager:1157/api/collector/collect";
-//     collectorIdentity := "collector-go-001"
-//
-//     mrs.logger.Info("🚀 模拟HTTP请求发送采集结果",
-//             "endpoint", managerEndpoint,
-//             "collectorId", collectorIdentity,
-//             "jobId", mrs.jobID,
-//             "dataCount", len(metricsData),
-//             "timestamp", time.Now().Format("15:04:05"))
-//
-//     // 这里可以添加真实的HTTP客户端代码
-//     // httpClient.Post(managerEndpoint, jsonData)
-//
-//     mrs.logger.Info("📤 采集结果已发送到Manager (模拟成功)",
-//             "status", "200 OK",
-//             "responseTime", "15ms")
-//}
diff --git a/internal/collector/basic/init.go b/internal/collector/basic/init.go
index e8456ed..74c4aa6 100644
--- a/internal/collector/basic/init.go
+++ b/internal/collector/basic/init.go
@@ -25,23 +25,32 @@ import (
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
+// init 函数在包被导入时自动执行
+// 集中注册所有协议的工厂函数
+func init() {
+       // 注册所有协议的工厂函数
+       // 新增协议时只需要在这里添加一行
+       strategy.RegisterFactory("jdbc", func(logger logger.Logger) 
strategy.Collector {
+               return database.NewJDBCCollector(logger)
+       })
+
+       // 未来可以在这里添加更多协议:
+       // strategy.RegisterFactory("http", func(logger logger.Logger) 
strategy.Collector {
+       //     return http.NewHTTPCollector(logger)
+       // })
+       // strategy.RegisterFactory("redis", func(logger logger.Logger) 
strategy.Collector {
+       //     return redis.NewRedisCollector(logger)
+       // })
+}
+
 // InitializeAllCollectors 初始化所有已注册的采集器
-// 当logger可用时调用此函数来创建实际的采集器实例
+// 此时init()函数已经注册了所有工厂函数
+// 这个函数创建实际的采集器实例
 func InitializeAllCollectors(logger logger.Logger) {
        logger.Info("initializing all collectors")
 
-       // 直接注册所有采集器实例
-       // 新增协议时在这里添加
-       jdbcCollector := database.NewJDBCCollector(logger)
-       strategy.Register(jdbcCollector)
-
-       // 未来可以在这里添加更多协议:
-       // httpCollector := http.NewHTTPCollector(logger)
-       // strategy.Register(httpCollector)
+       // 使用已注册的工厂函数创建采集器实例
+       strategy.InitializeCollectors(logger)
 
-       // 记录初始化结果
-       protocols := strategy.SupportedProtocols()
-       logger.Info("collectors initialized successfully",
-               "protocols", protocols,
-               "count", len(protocols))
+       logger.Info("all collectors initialized successfully")
 }
diff --git a/internal/collector/common/collect/strategy/strategy_factory.go 
b/internal/collector/common/collect/strategy/strategy_factory.go
index 9fc9af1..fa951b0 100644
--- a/internal/collector/common/collect/strategy/strategy_factory.go
+++ b/internal/collector/common/collect/strategy/strategy_factory.go
@@ -76,6 +76,15 @@ func InitializeCollectors(logger logger.Logger) {
        }
 
        globalRegistry.initialized = true
+
+       // Log initialization results
+       protocols := make([]string, 0, len(globalRegistry.collectors))
+       for protocol := range globalRegistry.collectors {
+               protocols = append(protocols, protocol)
+       }
+       logger.Info("collectors initialized from factories",
+               "protocols", protocols,
+               "count", len(protocols))
 }
 
 // Register a collector directly (legacy method, kept for compatibility)
diff --git a/internal/collector/common/dispatcher/common_dispatcher.go 
b/internal/collector/common/dispatcher/common_dispatcher.go
index 95fb25e..d7a58c0 100644
--- a/internal/collector/common/dispatcher/common_dispatcher.go
+++ b/internal/collector/common/dispatcher/common_dispatcher.go
@@ -64,7 +64,6 @@ func NewCommonDispatcher(logger logger.Logger, 
metricsCollector MetricsCollector
 }
 
 // DispatchMetricsTask dispatches a job by breaking it down into individual 
metrics collection tasks
-// This is the core method that corresponds to Java's 
CommonDispatcher.dispatchMetricsTask
 func (cd *CommonDispatcherImpl) DispatchMetricsTask(job *jobtypes.Job, timeout 
*jobtypes.Timeout) error {
        if job == nil {
                return fmt.Errorf("job cannot be nil")
@@ -79,7 +78,7 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(job 
*jobtypes.Job, timeout *
        startTime := time.Now()
 
        // Get the next metrics to collect based on job priority and 
dependencies
-       metricsToCollect := job.GetNextCollectMetrics()
+       metricsToCollect := job.NextCollectMetrics()
        if len(metricsToCollect) == 0 {
                cd.logger.Info("no metrics to collect", "jobID", job.ID)
                return nil
diff --git a/internal/collector/common/job/job_server.go 
b/internal/collector/common/job/job_server.go
index 35cb195..31eed6f 100644
--- a/internal/collector/common/job/job_server.go
+++ b/internal/collector/common/job/job_server.go
@@ -24,6 +24,9 @@ import (
        "fmt"
        "sync"
 
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/dispatch"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/dispatcher"
        clrServer 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector"
        jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
@@ -43,7 +46,6 @@ type Config struct {
 }
 
 // Runner implements the service runner interface
-// It directly manages job scheduling through timeDispatch
 type Runner struct {
        Config
        timeDispatch TimeDispatcher
@@ -54,7 +56,6 @@ type Runner struct {
 }
 
 // AddAsyncCollectJob adds a job to async collection scheduling
-// This is the main entry point that corresponds to Java's 
collectJobService.addAsyncCollectJob
 func (r *Runner) AddAsyncCollectJob(job *jobtypes.Job) error {
        if job == nil {
                r.Logger.Error(nil, "job cannot be nil")
@@ -116,21 +117,31 @@ func (r *Runner) RunningJobs() map[int64]*jobtypes.Job {
        return result
 }
 
-// New creates a new job service runner
+// New creates a new job service runner with all components initialized
 func New(srv *Config) *Runner {
        ctx, cancel := context.WithCancel(context.Background())
-       return &Runner{
-               Config:      *srv,
-               runningJobs: make(map[int64]*jobtypes.Job),
-               ctx:         ctx,
-               cancel:      cancel,
+
+       // Create result handler
+       resultHandler := collect.NewResultHandler(srv.Logger)
+
+       // Create metrics collector
+       metricsCollector := dispatch.NewMetricsCollector(srv.Logger)
+
+       // Create common dispatcher
+       commonDispatcher := dispatcher.NewCommonDispatcher(srv.Logger, 
metricsCollector, resultHandler)
+
+       // Create time dispatcher
+       timeDispatch := dispatcher.NewTimeDispatch(srv.Logger, commonDispatcher)
+
+       runner := &Runner{
+               Config:       *srv,
+               timeDispatch: timeDispatch,
+               runningJobs:  make(map[int64]*jobtypes.Job),
+               ctx:          ctx,
+               cancel:       cancel,
        }
-}
 
-// SetTimeDispatcher sets the time dispatcher for this runner
-// This should be called before starting the runner
-func (r *Runner) SetTimeDispatcher(td TimeDispatcher) {
-       r.timeDispatch = td
+       return runner
 }
 
 // Start starts the job service runner
@@ -138,12 +149,14 @@ func (r *Runner) Start(ctx context.Context) error {
        r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", 
r.Info().Name)
        r.Logger.Info("Starting job service runner")
 
-       // Start the time dispatcher if it's set
+       // Start the time dispatcher
        if r.timeDispatch != nil {
                if err := r.timeDispatch.Start(ctx); err != nil {
                        r.Logger.Error(err, "failed to start time dispatcher")
                        return fmt.Errorf("failed to start time dispatcher: 
%w", err)
                }
+       } else {
+               return fmt.Errorf("time dispatcher is not initialized")
        }
 
        r.Logger.Info("job service runner started successfully")
@@ -167,19 +180,23 @@ func (r *Runner) Info() collector.Info {
        }
 }
 
-// Close closes the job service runner
+// Close closes the job service runner and all its components
 func (r *Runner) Close() error {
        r.Logger.Info("closing job service runner")
 
        r.cancel()
 
+       // Stop time dispatcher
        if r.timeDispatch != nil {
                if err := r.timeDispatch.Stop(); err != nil {
                        r.Logger.Error(err, "error stopping time dispatcher")
-                       return err
+                       // Continue cleanup despite error
                }
        }
 
+       // Stop common dispatcher (via time dispatcher's commonDispatcher field)
+       // This is handled internally by the time dispatcher's Stop method
+
        // Clear running jobs
        r.mu.Lock()
        r.runningJobs = make(map[int64]*jobtypes.Job)
diff --git a/internal/collector/common/types/job/job_types.go 
b/internal/collector/common/types/job/job_types.go
index 9f1df89..28478b8 100644
--- a/internal/collector/common/types/job/job_types.go
+++ b/internal/collector/common/types/job/job_types.go
@@ -50,10 +50,10 @@ type Job struct {
        ResponseDataTemp []MetricsData        `json:"-"`
 }
 
-// GetNextCollectMetrics returns the metrics that should be collected next
+// NextCollectMetrics returns the metrics that should be collected next
 // This is a simplified version - in the full implementation this would handle
 // metric priorities, dependencies, and collection levels
-func (j *Job) GetNextCollectMetrics() []*Metrics {
+func (j *Job) NextCollectMetrics() []*Metrics {
        result := make([]*Metrics, 0, len(j.Metrics))
        for i := range j.Metrics {
                result = append(result, &j.Metrics[i])


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to