This is an automated email from the ASF dual-hosted git repository.

zhaoqingran pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 47ef6c3  Reconfiguration scheduling (#11)
47ef6c3 is described below

commit 47ef6c331eb9c84f7dd2a20d31623eeb6822a9a5
Author: 铁甲小宝 <[email protected]>
AuthorDate: Mon Sep 8 21:40:08 2025 +0800

    Reconfiguration scheduling (#11)
    
    * add:The first version of the modified schedule
    
    * add:All protocols are initialized using the init function.
    
    ---------
    
    Co-authored-by: Logic <[email protected]>
---
 examples/main_simulation.go                        | 306 ---------------------
 internal/cmd/server.go                             |   6 +
 internal/collector/basic/abstract_collect.go       | 119 --------
 .../collector/basic/database/jdbc_collector.go     |  66 +++--
 internal/collector/basic/init.go                   |  56 ++++
 internal/collector/basic/registry/.keep            |   0
 .../common/collect/dispatch/metrics_collector.go   | 141 ++++++++++
 .../collector/common/collect/result_handler.go     |  77 ++++++
 .../common/collect/strategy/strategy_factory.go    | 119 ++++++--
 .../common/dispatcher/common_dispatcher.go         | 244 ++++++++++++++++
 .../common/dispatcher/hashed_wheel_timer.go        | 226 +++++++++++++++
 internal/collector/common/dispatcher/time_wheel.go | 223 +++++++++++++++
 .../common/dispatcher/wheel_timer_task.go          | 105 +++++++
 internal/collector/common/job/job_server.go        | 156 ++++++++++-
 internal/collector/common/types/job/job_types.go   |   4 +-
 .../collector/common/types/job/timeout_types.go    |  18 +-
 16 files changed, 1383 insertions(+), 483 deletions(-)

diff --git a/examples/main_simulation.go b/examples/main_simulation.go
deleted file mode 100644
index a87665d..0000000
--- a/examples/main_simulation.go
+++ /dev/null
@@ -1,306 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package main
-
-//import (
-//     "context"
-//     "flag"
-//     "fmt"
-//     "os"
-//     "os/signal"
-//     "syscall"
-//     "time"
-//
-//     "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector"
-//     
"hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/dispatcher/entrance"
-//     "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger"
-//     "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types"
-//     jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job"
-//)
-//
-//// go build -ldflags "-X main.Version=x.y.z"
-//var (
-//     ConfPath   string
-//     Version    string
-//     Simulation bool // 新增:是否启用模拟模式
-//)
-//
-//func init() {
-//     flag.StringVar(&ConfPath, "conf", "hertzbeat-collector.yaml", "path to 
config file")
-//     flag.BoolVar(&Simulation, "simulation", true, "enable manager task 
simulation mode")
-//}
-//
-//func main() {
-//     flag.Parse()
-//
-//     // 初始化日志
-//     log := logger.DefaultLogger(os.Stdout, types.LogLevelInfo)
-//     log.Info("🚀 启动HertzBeat Collector", "version", Version, "simulation", 
Simulation)
-//     Simulation = true
-//     if Simulation {
-//             // 模拟模式:启动完整的CollectServer并模拟Manager发送任务
-//             runSimulationMode(log)
-//     } else {
-//             // 正常模式:启动标准的collector服务
-//             if err := collector.Bootstrap(ConfPath, Version); err != nil {
-//                     log.Error(err, "❌ 启动collector失败")
-//                     os.Exit(1)
-//             }
-//     }
-//}
-//
-//// runSimulationMode 运行模拟Manager发送任务的模式
-//func runSimulationMode(log logger.Logger) {
-//     log.Info("🎭 启动模拟模式:模拟Manager发送JDBC采集任务")
-//
-//     // 1. 创建CollectServer (完整的采集器架构)
-//     config := entrance.DefaultServerConfig()
-//     collectServer, err := entrance.NewCollectServer(config, log)
-//     if err != nil {
-//             log.Error(err, "❌ 创建CollectServer失败")
-//             return
-//     }
-//
-//     // 2. 启动CollectServer
-//     if err := collectServer.Start(); err != nil {
-//             log.Error(err, "❌ 启动CollectServer失败")
-//             return
-//     }
-//     defer collectServer.Stop()
-//
-//     log.Info("✅ CollectServer已启动,准备接收Manager任务")
-//
-//     // 3. 创建模拟的Manager任务
-//     jdbcJob := createManagerJDBCTask()
-//
-//     // 4. 启动任务模拟器
-//     ctx, cancel := context.WithCancel(context.Background())
-//     defer cancel()
-//
-//     // 启动模拟Manager发送任务的goroutine
-//     go simulateManagerTasks(ctx, collectServer, jdbcJob, log)
-//
-//     // 5. 等待中断信号
-//     log.Info("🔄 模拟器运行中 (每60秒模拟Manager发送任务),按Ctrl+C停止...")
-//     sigCh := make(chan os.Signal, 1)
-//     signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
-//     <-sigCh
-//
-//     log.Info("📴 收到停止信号,正在关闭...")
-//     cancel()
-//     time.Sleep(2 * time.Second)
-//     log.Info("👋 模拟器已停止")
-//}
-//
-//// createManagerJDBCTask 创建模拟从Manager接收的JDBC采集任务
-//func createManagerJDBCTask() *jobtypes.Job {
-//     return &jobtypes.Job{
-//             ID:              1001,
-//             TenantID:        1,
-//             MonitorID:       2001,
-//             App:             "mysql",
-//             Category:        "database",
-//             IsCyclic:        true,
-//             DefaultInterval: 30, // 30秒间隔
-//             Timestamp:       time.Now().Unix(),
-//
-//             // Manager发送的任务元数据
-//             Metadata: map[string]string{
-//                     "instancename":       "生产MySQL实例",
-//                     "instancehost":       "localhost:43306",
-//                     "manager_node":       "hertzbeat-manager-001",
-//                     "task_source":        "manager_scheduler",
-//                     "collector_assigned": "auto",
-//             },
-//
-//             Labels: map[string]string{
-//                     "env":        "production",
-//                     "database":   "mysql",
-//                     "region":     "localhost",
-//                     "managed_by": "hertzbeat",
-//                     "priority":   "high",
-//             },
-//
-//             Annotations: map[string]string{
-//                     "description":   "MySQL数据库性能监控",
-//                     "created_by":    "manager-scheduler",
-//                     "task_type":     "cyclic_monitoring",
-//                     "alert_enabled": "true",
-//             },
-//
-//             // JDBC采集指标配置
-//             Metrics: []jobtypes.Metrics{
-//                     {
-//                             Name:     "mysql_basic_info",
-//                             Priority: 0,
-//                             Protocol: "jdbc",
-//                             Host:     "localhost",
-//                             Port:     "3306",
-//                             Timeout:  "15s",
-//                             Interval: 30,
-//                             Fields: []jobtypes.Field{
-//                                     {Field: "database_name", Type: 1, 
Label: true},
-//                                     {Field: "version", Type: 1, Label: 
false},
-//                                     {Field: "uptime_seconds", Type: 0, 
Label: false},
-//                                     {Field: "server_id", Type: 0, Label: 
false},
-//                             },
-//                             JDBC: &jobtypes.JDBCProtocol{
-//                                     Host:      "localhost",
-//                                     Port:      "3306",
-//                                     Platform:  "mysql",
-//                                     Username:  "root",
-//                                     Password:  "password", // 请修改为实际密码
-//                                     Database:  "mysql",
-//                                     Timeout:   "15",
-//                                     QueryType: "oneRow",
-//                                     SQL: `SELECT
-//                                             DATABASE() as database_name,
-//                                             VERSION() as version,
-//                                             (SELECT VARIABLE_VALUE FROM 
INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME='Uptime') as 
uptime_seconds,
-//                                             @@server_id as server_id`,
-//                             },
-//                     },
-//             },
-//     }
-//}
-//
-//// simulateManagerTasks 模拟Manager定期发送采集任务
-//func simulateManagerTasks(ctx context.Context, collectServer 
*entrance.CollectServer, job *jobtypes.Job, log logger.Logger) {
-//     log.Info("📡 开始模拟Manager任务调度", "jobId", job.ID, "interval", "60s")
-//
-//     // 创建任务响应监听器 (模拟发送结果回Manager)
-//     eventListener := &ManagerResponseSimulator{
-//             logger:    log,
-//             jobID:     job.ID,
-//             monitorID: job.MonitorID,
-//     }
-//
-//     // 立即发送第一个任务 (模拟Manager初始调度)
-//     log.Info("📤 Manager发送初始任务", "timestamp", time.Now().Format("15:04:05"))
-//     if err := collectServer.ReceiveJob(job, eventListener); err != nil {
-//             log.Error(err, "❌ 接收初始任务失败")
-//             return
-//     }
-//
-//     // 模拟Manager定期发送任务更新/重新调度
-//     ticker := time.NewTicker(60 * time.Second) // 每60秒模拟一次Manager调度
-//     defer ticker.Stop()
-//
-//     taskSequence := 1
-//
-//     for {
-//             select {
-//             case <-ticker.C:
-//                     taskSequence++
-//
-//                     // 创建任务更新 (模拟Manager重新调度或配置更新)
-//                     updatedJob := *job
-//                     updatedJob.Timestamp = time.Now().Unix()
-//                     updatedJob.ID = job.ID + int64(taskSequence)
-//
-//                     // 更新任务元数据 (模拟Manager的管理信息)
-//                     updatedJob.Metadata["task_sequence"] = 
fmt.Sprintf("%d", taskSequence)
-//                     updatedJob.Metadata["last_scheduled"] = 
time.Now().Format(time.RFC3339)
-//                     updatedJob.Metadata["scheduler_version"] = "v1.0.0"
-//
-//                     log.Info("📤 Manager发送任务更新",
-//                             "sequence", taskSequence,
-//                             "jobId", updatedJob.ID,
-//                             "timestamp", time.Now().Format("15:04:05"))
-//
-//                     // 发送更新任务到Collector
-//                     if err := collectServer.ReceiveJob(&updatedJob, 
eventListener); err != nil {
-//                             log.Error(err, "❌ 接收任务更新失败", "sequence", 
taskSequence)
-//                     }
-//
-//             case <-ctx.Done():
-//                     log.Info("🛑 停止模拟Manager任务调度")
-//                     return
-//             }
-//     }
-//}
-//
-//// ManagerResponseSimulator 模拟Manager接收采集结果的响应处理器
-//type ManagerResponseSimulator struct {
-//     logger    logger.Logger
-//     jobID     int64
-//     monitorID int64
-//}
-//
-//// Response 处理采集结果 (模拟发送给Manager的过程)
-//func (mrs *ManagerResponseSimulator) Response(metricsData []interface{}) {
-//     mrs.logger.Info("📨 Collector采集完成,准备发送结果到Manager",
-//             "jobId", mrs.jobID,
-//             "monitorId", mrs.monitorID,
-//             "metricsCount", len(metricsData))
-//
-//     // 模拟处理每个采集指标的结果
-//     for i, data := range metricsData {
-//             if collectData, ok := data.(*jobtypes.CollectRepMetricsData); 
ok {
-//                     mrs.logger.Info("📊 处理采集指标",
-//                             "metricIndex", i+1,
-//                             "metricName", collectData.Metrics,
-//                             "code", collectData.Code,
-//                             "time", collectData.Time)
-//
-//                     if collectData.Code == 200 {
-//                             mrs.logger.Info("✅ 指标采集成功",
-//                                     "metric", collectData.Metrics,
-//                                     "fieldsCount", len(collectData.Fields),
-//                                     "valuesCount", len(collectData.Values))
-//
-//                             // 打印采集数据 (模拟发送到Manager的数据)
-//                             for rowIndex, valueRow := range 
collectData.Values {
-//                                     mrs.logger.Info("📈 采集数据",
-//                                             "metric", collectData.Metrics,
-//                                             "row", rowIndex+1,
-//                                             "values", valueRow.Columns)
-//                             }
-//                     } else {
-//                             mrs.logger.Error(fmt.Errorf("采集失败"), "❌ 指标采集错误",
-//                                     "metric", collectData.Metrics,
-//                                     "code", collectData.Code,
-//                                     "message", collectData.Msg)
-//                     }
-//             }
-//     }
-//
-//     // 模拟发送HTTP响应到Manager
-//     mrs.simulateHTTPResponseToManager(metricsData)
-//}
-//
-//// simulateHTTPResponseToManager 模拟通过HTTP将采集结果发送回Manager
-//func (mrs *ManagerResponseSimulator) 
simulateHTTPResponseToManager(metricsData []interface{}) {
-//     // 模拟HTTP请求参数
-//     managerEndpoint := "http://hertzbeat-manager:1157/api/collector/collect";
-//     collectorIdentity := "collector-go-001"
-//
-//     mrs.logger.Info("🚀 模拟HTTP请求发送采集结果",
-//             "endpoint", managerEndpoint,
-//             "collectorId", collectorIdentity,
-//             "jobId", mrs.jobID,
-//             "dataCount", len(metricsData),
-//             "timestamp", time.Now().Format("15:04:05"))
-//
-//     // 这里可以添加真实的HTTP客户端代码
-//     // httpClient.Post(managerEndpoint, jsonData)
-//
-//     mrs.logger.Info("📤 采集结果已发送到Manager (模拟成功)",
-//             "status", "200 OK",
-//             "responseTime", "15ms")
-//}
diff --git a/internal/cmd/server.go b/internal/cmd/server.go
index 3b88e5b..81a8bfe 100644
--- a/internal/cmd/server.go
+++ b/internal/cmd/server.go
@@ -30,6 +30,7 @@ import (
        "github.com/spf13/cobra"
 
        bannerouter 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/banner"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
        jobserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job"
        clrServer 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
        transportserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport"
@@ -109,6 +110,11 @@ func server(ctx context.Context, logOut io.Writer) error {
 
 func startRunners(ctx context.Context, cfg *clrServer.Server) error {
 
+       // Initialize all collectors before starting runners
+       // This ensures all protocol collectors are registered and ready
+       cfg.Logger.Info("Initializing collectors...")
+       basic.InitializeAllCollectors(cfg.Logger)
+
        runners := []struct {
                runner Runner[collectortypes.Info]
        }{
diff --git a/internal/collector/basic/abstract_collect.go 
b/internal/collector/basic/abstract_collect.go
deleted file mode 100644
index c67e618..0000000
--- a/internal/collector/basic/abstract_collect.go
+++ /dev/null
@@ -1,119 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package basic
-
-import (
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
-       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/timer"
-)
-
-// AbstractCollector defines the interface for all collectors
-type AbstractCollector interface {
-       // PreCheck validates metrics configuration
-       PreCheck(metrics *jobtypes.Metrics) error
-
-       // Collect performs the actual metrics collection
-       Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRepMetricsData
-
-       // SupportProtocol returns the protocol this collector supports
-       SupportProtocol() string
-}
-
-// BaseCollector provides common functionality for all collectors
-type BaseCollector struct {
-       Logger logger.Logger
-}
-
-// NewBaseCollector creates a new base collector
-func NewBaseCollector(logger logger.Logger) *BaseCollector {
-       return &BaseCollector{
-               Logger: logger.WithName("base-collector"),
-       }
-}
-
-// CreateSuccessResponse creates a successful metrics data response
-func (bc *BaseCollector) CreateSuccessResponse(metrics *jobtypes.Metrics) 
*jobtypes.CollectRepMetricsData {
-       return &jobtypes.CollectRepMetricsData{
-               ID:        0,  // Will be set by the calling context
-               MonitorID: 0,  // Will be set by the calling context
-               App:       "", // Will be set by the calling context
-               Metrics:   metrics.Name,
-               Priority:  0,
-               Time:      timer.GetCurrentTimeMillis(),
-               Code:      200, // Success
-               Msg:       "success",
-               Fields:    make([]jobtypes.Field, 0),
-               Values:    make([]jobtypes.ValueRow, 0),
-               Labels:    make(map[string]string),
-               Metadata:  make(map[string]string),
-       }
-}
-
-// CreateFailResponse creates a failed metrics data response
-func (bc *BaseCollector) CreateFailResponse(metrics *jobtypes.Metrics, code 
int, message string) *jobtypes.CollectRepMetricsData {
-       return &jobtypes.CollectRepMetricsData{
-               ID:        0,  // Will be set by the calling context
-               MonitorID: 0,  // Will be set by the calling context
-               App:       "", // Will be set by the calling context
-               Metrics:   metrics.Name,
-               Priority:  0,
-               Time:      timer.GetCurrentTimeMillis(),
-               Code:      code,
-               Msg:       message,
-               Fields:    make([]jobtypes.Field, 0),
-               Values:    make([]jobtypes.ValueRow, 0),
-               Labels:    make(map[string]string),
-               Metadata:  make(map[string]string),
-       }
-}
-
-// CollectorRegistry manages registered collectors
-type CollectorRegistry struct {
-       collectors map[string]AbstractCollector
-       logger     logger.Logger
-}
-
-// NewCollectorRegistry creates a new collector registry
-func NewCollectorRegistry(logger logger.Logger) *CollectorRegistry {
-       return &CollectorRegistry{
-               collectors: make(map[string]AbstractCollector),
-               logger:     logger.WithName("collector-registry"),
-       }
-}
-
-// Register registers a collector for a specific protocol
-func (cr *CollectorRegistry) Register(protocol string, collector 
AbstractCollector) {
-       cr.collectors[protocol] = collector
-       cr.logger.Info("registered collector", "protocol", protocol)
-}
-
-// GetCollector gets a collector for the specified protocol
-func (cr *CollectorRegistry) GetCollector(protocol string) (AbstractCollector, 
bool) {
-       collector, exists := cr.collectors[protocol]
-       return collector, exists
-}
-
-// GetSupportedProtocols returns all supported protocols
-func (cr *CollectorRegistry) GetSupportedProtocols() []string {
-       protocols := make([]string, 0, len(cr.collectors))
-       for protocol := range cr.collectors {
-               protocols = append(protocols, protocol)
-       }
-       return protocols
-}
diff --git a/internal/collector/basic/database/jdbc_collector.go 
b/internal/collector/basic/database/jdbc_collector.go
index 361ccc9..01fa7f2 100644
--- a/internal/collector/basic/database/jdbc_collector.go
+++ b/internal/collector/basic/database/jdbc_collector.go
@@ -30,8 +30,6 @@ import (
        _ "github.com/microsoft/go-mssqldb"
        jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
-
-       "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
 )
 
 const (
@@ -59,13 +57,13 @@ const (
 
 // JDBCCollector implements JDBC database collection
 type JDBCCollector struct {
-       *basic.BaseCollector
+       logger logger.Logger
 }
 
 // NewJDBCCollector creates a new JDBC collector
 func NewJDBCCollector(logger logger.Logger) *JDBCCollector {
        return &JDBCCollector{
-               BaseCollector: 
basic.NewBaseCollector(logger.WithName("jdbc-collector")),
+               logger: logger.WithName("jdbc-collector"),
        }
 }
 
@@ -127,7 +125,7 @@ func (jc *JDBCCollector) Collect(metrics *jobtypes.Metrics) 
*jobtypes.CollectRep
        startTime := time.Now()
        jdbc := metrics.JDBC
 
-       jc.Logger.Info("starting JDBC collection",
+       jc.logger.Info("starting JDBC collection",
                "host", jdbc.Host,
                "port", jdbc.Port,
                "platform", jdbc.Platform,
@@ -140,20 +138,20 @@ func (jc *JDBCCollector) Collect(metrics 
*jobtypes.Metrics) *jobtypes.CollectRep
        // Get database URL
        databaseURL, err := jc.constructDatabaseURL(jdbc)
        if err != nil {
-               jc.Logger.Error(err, "failed to construct database URL")
-               return jc.CreateFailResponse(metrics, CodeFail, 
fmt.Sprintf("Database URL error: %v", err))
+               jc.logger.Error(err, "failed to construct database URL")
+               return jc.createFailResponse(metrics, CodeFail, 
fmt.Sprintf("Database URL error: %v", err))
        }
 
        // Create database connection
        db, err := jc.getConnection(databaseURL, jdbc.Username, jdbc.Password, 
timeout)
        if err != nil {
-               jc.Logger.Error(err, "failed to connect to database")
-               return jc.CreateFailResponse(metrics, CodeUnConnectable, 
fmt.Sprintf("Connection error: %v", err))
+               jc.logger.Error(err, "failed to connect to database")
+               return jc.createFailResponse(metrics, CodeUnConnectable, 
fmt.Sprintf("Connection error: %v", err))
        }
        defer db.Close()
 
        // Execute query based on type with context
-       response := jc.CreateSuccessResponse(metrics)
+       response := jc.createSuccessResponse(metrics)
 
        switch jdbc.QueryType {
        case QueryTypeOneRow:
@@ -169,20 +167,20 @@ func (jc *JDBCCollector) Collect(metrics 
*jobtypes.Metrics) *jobtypes.CollectRep
        }
 
        if err != nil {
-               jc.Logger.Error(err, "query execution failed", "queryType", 
jdbc.QueryType)
-               return jc.CreateFailResponse(metrics, CodeFail, 
fmt.Sprintf("Query error: %v", err))
+               jc.logger.Error(err, "query execution failed", "queryType", 
jdbc.QueryType)
+               return jc.createFailResponse(metrics, CodeFail, 
fmt.Sprintf("Query error: %v", err))
        }
 
        duration := time.Since(startTime)
-       jc.Logger.Info("JDBC collection completed",
+       jc.logger.Info("JDBC collection completed",
                "duration", duration,
                "rowCount", len(response.Values))
 
        return response
 }
 
-// SupportProtocol returns the protocol this collector supports
-func (jc *JDBCCollector) SupportProtocol() string {
+// Protocol returns the protocol this collector supports
+func (jc *JDBCCollector) Protocol() string {
        return ProtocolJDBC
 }
 
@@ -467,6 +465,42 @@ func (jc *JDBCCollector) queryColumns(db *sql.DB, sqlQuery 
string, aliasFields [
 // runScript executes a SQL script (placeholder implementation)
 func (jc *JDBCCollector) runScript(db *sql.DB, scriptPath string, response 
*jobtypes.CollectRepMetricsData) error {
        // For security reasons, we'll just return success without actually 
running scripts
-       jc.Logger.Info("script execution requested but disabled for security", 
"scriptPath", scriptPath)
+       jc.logger.Info("script execution requested but disabled for security", 
"scriptPath", scriptPath)
        return nil
 }
+
+// createSuccessResponse creates a successful metrics data response
+func (jc *JDBCCollector) createSuccessResponse(metrics *jobtypes.Metrics) 
*jobtypes.CollectRepMetricsData {
+       return &jobtypes.CollectRepMetricsData{
+               ID:        0,  // Will be set by the calling context
+               MonitorID: 0,  // Will be set by the calling context
+               App:       "", // Will be set by the calling context
+               Metrics:   metrics.Name,
+               Priority:  0,
+               Time:      time.Now().UnixMilli(),
+               Code:      200, // Success
+               Msg:       "success",
+               Fields:    make([]jobtypes.Field, 0),
+               Values:    make([]jobtypes.ValueRow, 0),
+               Labels:    make(map[string]string),
+               Metadata:  make(map[string]string),
+       }
+}
+
+// createFailResponse creates a failed metrics data response
+func (jc *JDBCCollector) createFailResponse(metrics *jobtypes.Metrics, code 
int, message string) *jobtypes.CollectRepMetricsData {
+       return &jobtypes.CollectRepMetricsData{
+               ID:        0,  // Will be set by the calling context
+               MonitorID: 0,  // Will be set by the calling context
+               App:       "", // Will be set by the calling context
+               Metrics:   metrics.Name,
+               Priority:  0,
+               Time:      time.Now().UnixMilli(),
+               Code:      code,
+               Msg:       message,
+               Fields:    make([]jobtypes.Field, 0),
+               Values:    make([]jobtypes.ValueRow, 0),
+               Labels:    make(map[string]string),
+               Metadata:  make(map[string]string),
+       }
+}
diff --git a/internal/collector/basic/init.go b/internal/collector/basic/init.go
new file mode 100644
index 0000000..74c4aa6
--- /dev/null
+++ b/internal/collector/basic/init.go
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package basic
+
+import (
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/database"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// init 函数在包被导入时自动执行
+// 集中注册所有协议的工厂函数
+func init() {
+       // 注册所有协议的工厂函数
+       // 新增协议时只需要在这里添加一行
+       strategy.RegisterFactory("jdbc", func(logger logger.Logger) 
strategy.Collector {
+               return database.NewJDBCCollector(logger)
+       })
+
+       // 未来可以在这里添加更多协议:
+       // strategy.RegisterFactory("http", func(logger logger.Logger) 
strategy.Collector {
+       //     return http.NewHTTPCollector(logger)
+       // })
+       // strategy.RegisterFactory("redis", func(logger logger.Logger) 
strategy.Collector {
+       //     return redis.NewRedisCollector(logger)
+       // })
+}
+
+// InitializeAllCollectors 初始化所有已注册的采集器
+// 此时init()函数已经注册了所有工厂函数
+// 这个函数创建实际的采集器实例
+func InitializeAllCollectors(logger logger.Logger) {
+       logger.Info("initializing all collectors")
+
+       // 使用已注册的工厂函数创建采集器实例
+       strategy.InitializeCollectors(logger)
+
+       logger.Info("all collectors initialized successfully")
+}
diff --git a/internal/collector/basic/registry/.keep 
b/internal/collector/basic/registry/.keep
deleted file mode 100644
index e69de29..0000000
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go 
b/internal/collector/common/collect/dispatch/metrics_collector.go
new file mode 100644
index 0000000..b569858
--- /dev/null
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package dispatch
+
+import (
+       "fmt"
+       "time"
+
+       _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// MetricsCollector handles metrics collection using goroutines and channels
+type MetricsCollector struct {
+       logger logger.Logger
+}
+
+// NewMetricsCollector creates a new metrics collector
+func NewMetricsCollector(logger logger.Logger) *MetricsCollector {
+       return &MetricsCollector{
+               logger: logger.WithName("metrics-collector"),
+       }
+}
+
+// CollectMetrics collects metrics using the appropriate collector and returns 
results via channel
+// This method uses Go's channel-based concurrency instead of Java's thread 
pools
+func (mc *MetricsCollector) CollectMetrics(metrics *jobtypes.Metrics, job 
*jobtypes.Job, timeout *jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData {
+       resultChan := make(chan *jobtypes.CollectRepMetricsData, 1)
+
+       // Start collection in a goroutine (Go's lightweight thread)
+       go func() {
+               defer close(resultChan)
+
+               mc.logger.Info("starting metrics collection",
+                       "jobID", job.ID,
+                       "metricsName", metrics.Name,
+                       "protocol", metrics.Protocol)
+
+               startTime := time.Now()
+
+               // Get the appropriate collector based on protocol
+               collector, err := strategy.CollectorFor(metrics.Protocol)
+               if err != nil {
+                       mc.logger.Error(err, "failed to get collector",
+                               "protocol", metrics.Protocol,
+                               "metricsName", metrics.Name)
+
+                       result := mc.createErrorResponse(metrics, job, 500, 
fmt.Sprintf("Collector not found: %v", err))
+                       resultChan <- result
+                       return
+               }
+
+               // Perform the actual collection
+               result := collector.Collect(metrics)
+
+               // Enrich result with job information
+               if result != nil {
+                       result.ID = job.ID
+                       result.MonitorID = job.MonitorID
+                       result.App = job.App
+                       result.TenantID = job.TenantID
+
+                       // Add labels from job
+                       if result.Labels == nil {
+                               result.Labels = make(map[string]string)
+                       }
+                       for k, v := range job.Labels {
+                               result.Labels[k] = v
+                       }
+
+                       // Add metadata from job
+                       if result.Metadata == nil {
+                               result.Metadata = make(map[string]string)
+                       }
+                       for k, v := range job.Metadata {
+                               result.Metadata[k] = v
+                       }
+               }
+
+               duration := time.Since(startTime)
+
+               if result != nil && result.Code == 200 {
+                       mc.logger.Info("metrics collection completed 
successfully",
+                               "jobID", job.ID,
+                               "metricsName", metrics.Name,
+                               "protocol", metrics.Protocol,
+                               "duration", duration,
+                               "valuesCount", len(result.Values))
+               } else {
+                       mc.logger.Info("metrics collection failed",
+                               "jobID", job.ID,
+                               "metricsName", metrics.Name,
+                               "protocol", metrics.Protocol,
+                               "duration", duration,
+                               "code", result.Code,
+                               "message", result.Msg)
+               }
+
+               resultChan <- result
+       }()
+
+       return resultChan
+}
+
+// createErrorResponse creates an error response for failed collections
+func (mc *MetricsCollector) createErrorResponse(metrics *jobtypes.Metrics, job 
*jobtypes.Job, code int, message string) *jobtypes.CollectRepMetricsData {
+       return &jobtypes.CollectRepMetricsData{
+               ID:        job.ID,
+               MonitorID: job.MonitorID,
+               TenantID:  job.TenantID,
+               App:       job.App,
+               Metrics:   metrics.Name,
+               Priority:  metrics.Priority,
+               Time:      time.Now().UnixMilli(),
+               Code:      code,
+               Msg:       message,
+               Fields:    make([]jobtypes.Field, 0),
+               Values:    make([]jobtypes.ValueRow, 0),
+               Labels:    make(map[string]string),
+               Metadata:  make(map[string]string),
+       }
+}
diff --git a/internal/collector/common/collect/result_handler.go 
b/internal/collector/common/collect/result_handler.go
new file mode 100644
index 0000000..701ca63
--- /dev/null
+++ b/internal/collector/common/collect/result_handler.go
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package collect
+
+import (
+       "fmt"
+
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// ResultHandlerImpl implements the ResultHandler interface
+type ResultHandlerImpl struct {
+       logger logger.Logger
+       // TODO: Add data queue or storage interface when needed
+}
+
+// NewResultHandler creates a new result handler
+func NewResultHandler(logger logger.Logger) *ResultHandlerImpl {
+       return &ResultHandlerImpl{
+               logger: logger.WithName("result-handler"),
+       }
+}
+
+// HandleCollectData processes the collection results
+func (rh *ResultHandlerImpl) HandleCollectData(data 
*jobtypes.CollectRepMetricsData, job *jobtypes.Job) error {
+       if data == nil {
+               rh.logger.Error(nil, "collect data is nil")
+               return fmt.Errorf("collect data is nil")
+       }
+
+       rh.logger.Info("handling collect data",
+               "jobID", job.ID,
+               "monitorID", job.MonitorID,
+               "metricsName", data.Metrics,
+               "code", data.Code,
+               "valuesCount", len(data.Values))
+
+       // TODO: Implement actual data processing logic
+       // This could include:
+       // 1. Data validation and transformation
+       // 2. Sending to message queue
+       // 3. Storing to database
+       // 4. Triggering alerts based on thresholds
+       // 5. Updating monitoring status
+
+       if data.Code == 200 {
+               rh.logger.Info("successfully processed collect data",
+                       "jobID", job.ID,
+                       "metricsName", data.Metrics)
+       } else {
+               rh.logger.Info("received failed collect data",
+                       "jobID", job.ID,
+                       "metricsName", data.Metrics,
+                       "code", data.Code,
+                       "message", data.Msg)
+       }
+
+       return nil
+}
diff --git a/internal/collector/common/collect/strategy/strategy_factory.go 
b/internal/collector/common/collect/strategy/strategy_factory.go
index 17e4874..fa951b0 100644
--- a/internal/collector/common/collect/strategy/strategy_factory.go
+++ b/internal/collector/common/collect/strategy/strategy_factory.go
@@ -18,39 +18,112 @@
 package strategy
 
 import (
+       "fmt"
        "sync"
+
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
-// AbstractCollect interface
-type AbstractCollect interface {
-       SupportProtocol() string
+// Collector interface defines the contract for all collectors
+type Collector interface {
+       // Collect performs the actual metrics collection
+       Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRepMetricsData
+
+       // Protocol returns the protocol this collector supports
+       Protocol() string
 }
 
-// strategy container
-var (
-       collectStrategy = make(map[string]AbstractCollect)
-       mu              sync.RWMutex
-)
+// CollectorFactory is a function that creates a collector with a logger
+type CollectorFactory func(logger.Logger) Collector
 
-// Register a collect strategy
-// Example: registration in init() of each implementation file
-//
-//     func init() {
-//         Register(&XXXCollect{})
-//     }
-func Register(collect AbstractCollect) {
+// CollectorRegistry manages all registered collector factories
+type CollectorRegistry struct {
+       factories   map[string]CollectorFactory
+       collectors  map[string]Collector
+       mu          sync.RWMutex
+       initialized bool
+}
 
-       mu.Lock()
-       defer mu.Unlock()
+// Global collector registry instance
+var globalRegistry = &CollectorRegistry{
+       factories:  make(map[string]CollectorFactory),
+       collectors: make(map[string]Collector),
+}
 
-       collectStrategy[collect.SupportProtocol()] = collect
+// RegisterFactory registers a collector factory function
+// This is called during init() to register collector factories
+func RegisterFactory(protocol string, factory CollectorFactory) {
+       globalRegistry.mu.Lock()
+       defer globalRegistry.mu.Unlock()
+
+       globalRegistry.factories[protocol] = factory
 }
 
-// Invoke returns the collect strategy for a protocol
-func Invoke(protocol string) AbstractCollect {
+// InitializeCollectors creates actual collector instances using the 
registered factories
+// This should be called once when logger is available
+func InitializeCollectors(logger logger.Logger) {
+       globalRegistry.mu.Lock()
+       defer globalRegistry.mu.Unlock()
+
+       if globalRegistry.initialized {
+               return // Already initialized
+       }
+
+       for protocol, factory := range globalRegistry.factories {
+               collector := factory(logger)
+               globalRegistry.collectors[protocol] = collector
+       }
+
+       globalRegistry.initialized = true
+
+       // Log initialization results
+       protocols := make([]string, 0, len(globalRegistry.collectors))
+       for protocol := range globalRegistry.collectors {
+               protocols = append(protocols, protocol)
+       }
+       logger.Info("collectors initialized from factories",
+               "protocols", protocols,
+               "count", len(protocols))
+}
+
+// Register a collector directly (legacy method, kept for compatibility)
+func Register(collector Collector) {
+       globalRegistry.mu.Lock()
+       defer globalRegistry.mu.Unlock()
+
+       protocol := collector.Protocol()
+       globalRegistry.collectors[protocol] = collector
+}
+
+// CollectorFor returns the collector for a protocol
+func CollectorFor(protocol string) (Collector, error) {
+       globalRegistry.mu.RLock()
+       defer globalRegistry.mu.RUnlock()
+
+       collector, exists := globalRegistry.collectors[protocol]
+       if !exists {
+               return nil, fmt.Errorf("no collector found for protocol: %s", 
protocol)
+       }
+       return collector, nil
+}
+
+// SupportedProtocols returns all supported protocols
+func SupportedProtocols() []string {
+       globalRegistry.mu.RLock()
+       defer globalRegistry.mu.RUnlock()
+
+       protocols := make([]string, 0, len(globalRegistry.collectors))
+       for protocol := range globalRegistry.collectors {
+               protocols = append(protocols, protocol)
+       }
+       return protocols
+}
 
-       mu.RLock()
-       defer mu.RUnlock()
+// CollectorCount returns the number of registered collectors
+func CollectorCount() int {
+       globalRegistry.mu.RLock()
+       defer globalRegistry.mu.RUnlock()
 
-       return collectStrategy[protocol]
+       return len(globalRegistry.collectors)
 }
diff --git a/internal/collector/common/dispatcher/common_dispatcher.go 
b/internal/collector/common/dispatcher/common_dispatcher.go
new file mode 100644
index 0000000..d7a58c0
--- /dev/null
+++ b/internal/collector/common/dispatcher/common_dispatcher.go
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package dispatcher
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// CommonDispatcherImpl is responsible for breaking down jobs into individual 
metrics collection tasks
+// It manages collection timeouts, handles results, and decides on next round 
scheduling
+type CommonDispatcherImpl struct {
+       logger           logger.Logger
+       metricsCollector MetricsCollector
+       resultHandler    ResultHandler
+       ctx              context.Context
+       cancel           context.CancelFunc
+       mu               sync.RWMutex
+}
+
+// MetricsCollector interface for metrics collection
+type MetricsCollector interface {
+       CollectMetrics(metrics *jobtypes.Metrics, job *jobtypes.Job, timeout 
*jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData
+}
+
+// ResultHandler interface for handling collection results
+type ResultHandler interface {
+       HandleCollectData(data *jobtypes.CollectRepMetricsData, job 
*jobtypes.Job) error
+}
+
+// NewCommonDispatcher creates a new common dispatcher
+func NewCommonDispatcher(logger logger.Logger, metricsCollector 
MetricsCollector, resultHandler ResultHandler) *CommonDispatcherImpl {
+       ctx, cancel := context.WithCancel(context.Background())
+
+       return &CommonDispatcherImpl{
+               logger:           logger.WithName("common-dispatcher"),
+               metricsCollector: metricsCollector,
+               resultHandler:    resultHandler,
+               ctx:              ctx,
+               cancel:           cancel,
+       }
+}
+
+// DispatchMetricsTask dispatches a job by breaking it down into individual 
metrics collection tasks
+func (cd *CommonDispatcherImpl) DispatchMetricsTask(job *jobtypes.Job, timeout 
*jobtypes.Timeout) error {
+       if job == nil {
+               return fmt.Errorf("job cannot be nil")
+       }
+
+       cd.logger.Info("dispatching metrics task",
+               "jobID", job.ID,
+               "monitorID", job.MonitorID,
+               "app", job.App,
+               "metricsCount", len(job.Metrics))
+
+       startTime := time.Now()
+
+       // Get the next metrics to collect based on job priority and 
dependencies
+       metricsToCollect := job.NextCollectMetrics()
+       if len(metricsToCollect) == 0 {
+               cd.logger.Info("no metrics to collect", "jobID", job.ID)
+               return nil
+       }
+
+       // Create collection context with timeout
+       collectCtx, collectCancel := context.WithTimeout(cd.ctx, 
cd.getCollectionTimeout(job))
+       defer collectCancel()
+
+       // Collect all metrics concurrently using channels (Go's way of 
handling concurrency)
+       resultChannels := make([]chan *jobtypes.CollectRepMetricsData, 
len(metricsToCollect))
+
+       for i, metrics := range metricsToCollect {
+               cd.logger.Info("starting metrics collection",
+                       "jobID", job.ID,
+                       "metricsName", metrics.Name,
+                       "protocol", metrics.Protocol)
+
+               // Start metrics collection in goroutine
+               resultChannels[i] = cd.metricsCollector.CollectMetrics(metrics, 
job, timeout)
+       }
+
+       // Collect results from all metrics collection tasks
+       var results []*jobtypes.CollectRepMetricsData
+       var errors []error
+
+       for i, resultChan := range resultChannels {
+               select {
+               case result := <-resultChan:
+                       if result != nil {
+                               results = append(results, result)
+                               cd.logger.Info("received metrics result",
+                                       "jobID", job.ID,
+                                       "metricsName", metricsToCollect[i].Name,
+                                       "code", result.Code)
+                       }
+               case <-collectCtx.Done():
+                       errors = append(errors, fmt.Errorf("timeout collecting 
metrics: %s", metricsToCollect[i].Name))
+                       cd.logger.Info("metrics collection timeout",
+                               "jobID", job.ID,
+                               "metricsName", metricsToCollect[i].Name)
+               }
+       }
+
+       duration := time.Since(startTime)
+
+       // Handle collection results
+       if err := cd.handleResults(results, job, errors); err != nil {
+               cd.logger.Error(err, "failed to handle collection results",
+                       "jobID", job.ID,
+                       "duration", duration)
+               return err
+       }
+
+       cd.logger.Info("successfully dispatched metrics task",
+               "jobID", job.ID,
+               "resultsCount", len(results),
+               "errorsCount", len(errors),
+               "duration", duration)
+
+       return nil
+}
+
+// handleResults processes the collection results and decides on next actions
+func (cd *CommonDispatcherImpl) handleResults(results 
[]*jobtypes.CollectRepMetricsData, job *jobtypes.Job, errors []error) error {
+       cd.logger.Info("handling collection results",
+               "jobID", job.ID,
+               "resultsCount", len(results),
+               "errorsCount", len(errors))
+
+       // Process successful results
+       for _, result := range results {
+               if err := cd.resultHandler.HandleCollectData(result, job); err 
!= nil {
+                       cd.logger.Error(err, "failed to handle collect data",
+                               "jobID", job.ID,
+                               "metricsName", result.Metrics)
+                       // Continue processing other results even if one fails
+               }
+       }
+
+       // Log errors but don't fail the entire job
+       for _, err := range errors {
+               cd.logger.Error(err, "metrics collection error", "jobID", 
job.ID)
+       }
+
+       // TODO: Implement logic to decide if we need to trigger next level 
metrics
+       // or next round scheduling based on results
+       cd.evaluateNextActions(job, results, errors)
+
+       return nil
+}
+
+// evaluateNextActions decides what to do next based on collection results
+func (cd *CommonDispatcherImpl) evaluateNextActions(job *jobtypes.Job, results 
[]*jobtypes.CollectRepMetricsData, errors []error) {
+       cd.logger.Info("evaluating next actions",
+               "jobID", job.ID,
+               "resultsCount", len(results),
+               "errorsCount", len(errors))
+
+       // Check if we have dependent metrics that need to be collected next
+       hasNextLevel := cd.hasNextLevelMetrics(job, results)
+
+       if hasNextLevel {
+               cd.logger.Info("job has next level metrics to collect", 
"jobID", job.ID)
+               // TODO: Schedule next level metrics collection
+       }
+
+       // For cyclic jobs, the rescheduling is handled by WheelTimerTask
+       if job.IsCyclic {
+               cd.logger.Info("cyclic job will be rescheduled by timer", 
"jobID", job.ID)
+       }
+
+       // TODO: Send results to data queue for further processing
+       cd.sendToDataQueue(results, job)
+}
+
+// hasNextLevelMetrics checks if there are dependent metrics that should be 
collected next
+func (cd *CommonDispatcherImpl) hasNextLevelMetrics(job *jobtypes.Job, results 
[]*jobtypes.CollectRepMetricsData) bool {
+       // This is a simplified check - in a full implementation, this would 
analyze
+       // metric dependencies and priorities to determine if more metrics need 
collection
+       return false
+}
+
+// sendToDataQueue sends collection results to the data processing queue
+func (cd *CommonDispatcherImpl) sendToDataQueue(results 
[]*jobtypes.CollectRepMetricsData, job *jobtypes.Job) {
+       cd.logger.Info("sending results to data queue",
+               "jobID", job.ID,
+               "resultsCount", len(results))
+
+       // TODO: Implement queue sending logic
+       // For now, we'll just log that data should be sent
+       for _, result := range results {
+               cd.logger.Info("result ready for queue",
+                       "jobID", job.ID,
+                       "metricsName", result.Metrics,
+                       "code", result.Code,
+                       "valuesCount", len(result.Values))
+       }
+}
+
+// getCollectionTimeout calculates the timeout for metrics collection
+func (cd *CommonDispatcherImpl) getCollectionTimeout(job *jobtypes.Job) 
time.Duration {
+       // Default timeout is 30 seconds
+       defaultTimeout := 30 * time.Second
+
+       // If job has a specific timeout, use it
+       if job.DefaultInterval > 0 {
+               // Use 80% of the interval as timeout to allow for rescheduling
+               jobTimeout := time.Duration(job.DefaultInterval) * time.Second 
* 80 / 100
+               if jobTimeout > defaultTimeout {
+                       return jobTimeout
+               }
+       }
+
+       return defaultTimeout
+}
+
+// Stop stops the common dispatcher
+func (cd *CommonDispatcherImpl) Stop() error {
+       cd.logger.Info("stopping common dispatcher")
+       cd.cancel()
+       return nil
+}
diff --git a/internal/collector/common/dispatcher/hashed_wheel_timer.go 
b/internal/collector/common/dispatcher/hashed_wheel_timer.go
new file mode 100644
index 0000000..026fd2b
--- /dev/null
+++ b/internal/collector/common/dispatcher/hashed_wheel_timer.go
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package dispatcher
+
+import (
+       "context"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// hashedWheelTimer implements a time wheel for efficient timeout management
+type hashedWheelTimer struct {
+       logger       logger.Logger
+       tickDuration time.Duration
+       wheelSize    int
+       wheel        []*bucket
+       currentTick  int64
+       startTime    time.Time
+       ticker       *time.Ticker
+       ctx          context.Context
+       cancel       context.CancelFunc
+       started      int32
+       stopped      int32
+       wg           sync.WaitGroup
+}
+
+// bucket represents a time wheel bucket containing timeouts
+type bucket struct {
+       timeouts []*jobtypes.Timeout
+       mu       sync.RWMutex
+}
+
+// NewHashedWheelTimer creates a new hashed wheel timer
+func NewHashedWheelTimer(wheelSize int, tickDuration time.Duration, logger 
logger.Logger) HashedWheelTimer {
+       if wheelSize <= 0 {
+               wheelSize = 512
+       }
+       if tickDuration <= 0 {
+               tickDuration = 100 * time.Millisecond
+       }
+
+       ctx, cancel := context.WithCancel(context.Background())
+
+       timer := &hashedWheelTimer{
+               logger:       logger.WithName("hashed-wheel-timer"),
+               tickDuration: tickDuration,
+               wheelSize:    wheelSize,
+               wheel:        make([]*bucket, wheelSize),
+               ctx:          ctx,
+               cancel:       cancel,
+       }
+
+       // Initialize buckets
+       for i := 0; i < wheelSize; i++ {
+               timer.wheel[i] = &bucket{
+                       timeouts: make([]*jobtypes.Timeout, 0),
+               }
+       }
+
+       return timer
+}
+
+// NewTimeout creates a new timeout and adds it to the wheel
+func (hwt *hashedWheelTimer) NewTimeout(task jobtypes.TimerTask, delay 
time.Duration) *jobtypes.Timeout {
+       if atomic.LoadInt32(&hwt.stopped) == 1 {
+               hwt.logger.Info("timer is stopped, cannot create new timeout")
+               return nil
+       }
+
+       timeout := jobtypes.NewTimeout(task, delay)
+
+       // Calculate which bucket and tick this timeout belongs to
+       totalTicks := int64(delay / hwt.tickDuration)
+       currentTick := atomic.LoadInt64(&hwt.currentTick)
+       targetTick := currentTick + totalTicks
+
+       bucketIndex := int(targetTick % int64(hwt.wheelSize))
+       timeout.SetWheelIndex(int(targetTick))
+       timeout.SetBucketIndex(bucketIndex)
+
+       // Add to appropriate bucket
+       bucket := hwt.wheel[bucketIndex]
+       bucket.mu.Lock()
+       bucket.timeouts = append(bucket.timeouts, timeout)
+       bucket.mu.Unlock()
+
+       hwt.logger.Info("created timeout",
+               "delay", delay,
+               "bucketIndex", bucketIndex,
+               "targetTick", targetTick)
+
+       return timeout
+}
+
+// Start starts the timer wheel
+func (hwt *hashedWheelTimer) Start() error {
+       if !atomic.CompareAndSwapInt32(&hwt.started, 0, 1) {
+               return nil // already started
+       }
+
+       hwt.logger.Info("starting hashed wheel timer",
+               "wheelSize", hwt.wheelSize,
+               "tickDuration", hwt.tickDuration)
+
+       hwt.startTime = time.Now()
+       hwt.ticker = time.NewTicker(hwt.tickDuration)
+
+       hwt.wg.Add(1)
+       go hwt.run()
+
+       hwt.logger.Info("hashed wheel timer started")
+       return nil
+}
+
+// Stop stops the timer wheel
+func (hwt *hashedWheelTimer) Stop() error {
+       if !atomic.CompareAndSwapInt32(&hwt.stopped, 0, 1) {
+               return nil // already stopped
+       }
+
+       hwt.logger.Info("stopping hashed wheel timer")
+
+       hwt.cancel()
+
+       if hwt.ticker != nil {
+               hwt.ticker.Stop()
+       }
+
+       hwt.wg.Wait()
+
+       hwt.logger.Info("hashed wheel timer stopped")
+       return nil
+}
+
+// run is the main timer loop
+func (hwt *hashedWheelTimer) run() {
+       defer hwt.wg.Done()
+
+       for {
+               select {
+               case <-hwt.ctx.Done():
+                       return
+               case <-hwt.ticker.C:
+                       hwt.tick()
+               }
+       }
+}
+
+// tick processes one tick of the wheel
+func (hwt *hashedWheelTimer) tick() {
+       currentTick := atomic.AddInt64(&hwt.currentTick, 1)
+       bucketIndex := int(currentTick % int64(hwt.wheelSize))
+
+       bucket := hwt.wheel[bucketIndex]
+       bucket.mu.Lock()
+
+       // Process expired timeouts
+       var remaining []*jobtypes.Timeout
+       for _, timeout := range bucket.timeouts {
+               if timeout.IsCancelled() {
+                       continue // skip cancelled timeouts
+               }
+
+               // Check if timeout has expired
+               if time.Now().After(timeout.Deadline()) {
+                       // Execute the timeout task asynchronously
+                       go func(t *jobtypes.Timeout) {
+                               defer func() {
+                                       if r := recover(); r != nil {
+                                               hwt.logger.Error(nil, "panic in 
timeout task execution", "panic", r)
+                                       }
+                               }()
+
+                               if err := t.Task().Run(t); err != nil {
+                                       hwt.logger.Error(err, "error executing 
timeout task")
+                               }
+                       }(timeout)
+               } else {
+                       // Not yet expired, keep in bucket
+                       remaining = append(remaining, timeout)
+               }
+       }
+
+       bucket.timeouts = remaining
+       bucket.mu.Unlock()
+}
+
+// GetStats returns timer statistics
+func (hwt *hashedWheelTimer) GetStats() map[string]interface{} {
+       totalTimeouts := 0
+       for _, bucket := range hwt.wheel {
+               bucket.mu.RLock()
+               totalTimeouts += len(bucket.timeouts)
+               bucket.mu.RUnlock()
+       }
+
+       return map[string]interface{}{
+               "wheelSize":     hwt.wheelSize,
+               "tickDuration":  hwt.tickDuration,
+               "currentTick":   atomic.LoadInt64(&hwt.currentTick),
+               "totalTimeouts": totalTimeouts,
+               "started":       atomic.LoadInt32(&hwt.started) == 1,
+               "stopped":       atomic.LoadInt32(&hwt.stopped) == 1,
+       }
+}
diff --git a/internal/collector/common/dispatcher/time_wheel.go 
b/internal/collector/common/dispatcher/time_wheel.go
new file mode 100644
index 0000000..2040ec0
--- /dev/null
+++ b/internal/collector/common/dispatcher/time_wheel.go
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package dispatcher
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// TimeDispatch manages time-based job scheduling using a hashed wheel timer
+type TimeDispatch struct {
+       logger           logger.Logger
+       wheelTimer       HashedWheelTimer
+       commonDispatcher MetricsTaskDispatcher
+       cyclicTasks      sync.Map // map[int64]*jobtypes.Timeout for cyclic jobs
+       tempTasks        sync.Map // map[int64]*jobtypes.Timeout for one-time 
jobs
+       ctx              context.Context
+       cancel           context.CancelFunc
+       started          bool
+       mu               sync.RWMutex
+}
+
+// MetricsTaskDispatcher interface for metrics task dispatching
+type MetricsTaskDispatcher interface {
+       DispatchMetricsTask(job *jobtypes.Job, timeout *jobtypes.Timeout) error
+}
+
+// HashedWheelTimer interface for time wheel operations
+type HashedWheelTimer interface {
+       NewTimeout(task jobtypes.TimerTask, delay time.Duration) 
*jobtypes.Timeout
+       Start() error
+       Stop() error
+}
+
+// NewTimeDispatch creates a new time dispatcher
+func NewTimeDispatch(logger logger.Logger, commonDispatcher 
MetricsTaskDispatcher) *TimeDispatch {
+       ctx, cancel := context.WithCancel(context.Background())
+
+       td := &TimeDispatch{
+               logger:           logger.WithName("time-dispatch"),
+               commonDispatcher: commonDispatcher,
+               ctx:              ctx,
+               cancel:           cancel,
+               started:          false,
+       }
+
+       // Create hashed wheel timer with reasonable defaults
+       // 512 buckets, 100ms tick duration
+       td.wheelTimer = NewHashedWheelTimer(512, 100*time.Millisecond, logger)
+
+       return td
+}
+
+// AddJob adds a job to the time-based scheduler
+// This corresponds to Java's TimerDispatcher.addJob method
+func (td *TimeDispatch) AddJob(job *jobtypes.Job) error {
+       if job == nil {
+               return fmt.Errorf("job cannot be nil")
+       }
+
+       td.logger.Info("adding job to time dispatcher",
+               "jobID", job.ID,
+               "isCyclic", job.IsCyclic,
+               "interval", job.DefaultInterval)
+
+       // Create wheel timer task
+       timerTask := NewWheelTimerTask(job, td.commonDispatcher, td.logger)
+
+       // Calculate delay
+       var delay time.Duration
+       if job.DefaultInterval > 0 {
+               delay = time.Duration(job.DefaultInterval) * time.Second
+       } else {
+               delay = 30 * time.Second // default interval
+       }
+
+       // Create timeout
+       timeout := td.wheelTimer.NewTimeout(timerTask, delay)
+
+       // Store timeout based on job type
+       if job.IsCyclic {
+               td.cyclicTasks.Store(job.ID, timeout)
+               td.logger.Info("added cyclic job to scheduler", "jobID", 
job.ID, "delay", delay)
+       } else {
+               td.tempTasks.Store(job.ID, timeout)
+               td.logger.Info("added one-time job to scheduler", "jobID", 
job.ID, "delay", delay)
+       }
+
+       return nil
+}
+
+// RemoveJob removes a job from the scheduler
+func (td *TimeDispatch) RemoveJob(jobID int64) error {
+       td.logger.Info("removing job from time dispatcher", "jobID", jobID)
+
+       // Try to remove from cyclic tasks
+       if timeoutInterface, exists := td.cyclicTasks.LoadAndDelete(jobID); 
exists {
+               if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok {
+                       timeout.Cancel()
+                       td.logger.Info("removed cyclic job", "jobID", jobID)
+                       return nil
+               }
+       }
+
+       // Try to remove from temp tasks
+       if timeoutInterface, exists := td.tempTasks.LoadAndDelete(jobID); 
exists {
+               if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok {
+                       timeout.Cancel()
+                       td.logger.Info("removed one-time job", "jobID", jobID)
+                       return nil
+               }
+       }
+
+       return fmt.Errorf("job not found: %d", jobID)
+}
+
+// Start starts the time dispatcher
+func (td *TimeDispatch) Start(ctx context.Context) error {
+       td.mu.Lock()
+       defer td.mu.Unlock()
+
+       if td.started {
+               return fmt.Errorf("time dispatcher already started")
+       }
+
+       td.logger.Info("starting time dispatcher")
+
+       // Start the wheel timer
+       if err := td.wheelTimer.Start(); err != nil {
+               return fmt.Errorf("failed to start wheel timer: %w", err)
+       }
+
+       td.started = true
+       td.logger.Info("time dispatcher started successfully")
+       return nil
+}
+
+// Stop stops the time dispatcher
+func (td *TimeDispatch) Stop() error {
+       td.mu.Lock()
+       defer td.mu.Unlock()
+
+       if !td.started {
+               return nil
+       }
+
+       td.logger.Info("stopping time dispatcher")
+
+       // Cancel context
+       td.cancel()
+
+       // Cancel all running tasks
+       td.cyclicTasks.Range(func(key, value interface{}) bool {
+               if timeout, ok := value.(*jobtypes.Timeout); ok {
+                       timeout.Cancel()
+               }
+               return true
+       })
+
+       td.tempTasks.Range(func(key, value interface{}) bool {
+               if timeout, ok := value.(*jobtypes.Timeout); ok {
+                       timeout.Cancel()
+               }
+               return true
+       })
+
+       // Clear maps
+       td.cyclicTasks = sync.Map{}
+       td.tempTasks = sync.Map{}
+
+       // Stop wheel timer
+       if err := td.wheelTimer.Stop(); err != nil {
+               td.logger.Error(err, "error stopping wheel timer")
+       }
+
+       td.started = false
+       td.logger.Info("time dispatcher stopped")
+       return nil
+}
+
+// Stats returns dispatcher statistics
+func (td *TimeDispatch) Stats() map[string]interface{} {
+       cyclicCount := 0
+       tempCount := 0
+
+       td.cyclicTasks.Range(func(key, value interface{}) bool {
+               cyclicCount++
+               return true
+       })
+
+       td.tempTasks.Range(func(key, value interface{}) bool {
+               tempCount++
+               return true
+       })
+
+       return map[string]interface{}{
+               "cyclicJobs": cyclicCount,
+               "tempJobs":   tempCount,
+               "started":    td.started,
+       }
+}
diff --git a/internal/collector/common/dispatcher/wheel_timer_task.go 
b/internal/collector/common/dispatcher/wheel_timer_task.go
new file mode 100644
index 0000000..ebda8f2
--- /dev/null
+++ b/internal/collector/common/dispatcher/wheel_timer_task.go
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package dispatcher
+
+import (
+       "fmt"
+       "time"
+
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// WheelTimerTask represents a task that runs in the time wheel
+// This corresponds to Java's WheelTimerTask
+type WheelTimerTask struct {
+       job              *jobtypes.Job
+       commonDispatcher MetricsTaskDispatcher
+       logger           logger.Logger
+}
+
+// NewWheelTimerTask creates a new wheel timer task
+func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher 
MetricsTaskDispatcher, logger logger.Logger) *WheelTimerTask {
+       return &WheelTimerTask{
+               job:              job,
+               commonDispatcher: commonDispatcher,
+               logger:           logger.WithName("wheel-timer-task"),
+       }
+}
+
+// Run executes the timer task
+// This corresponds to Java's WheelTimerTask.run method
+func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) error {
+       if wtt.job == nil {
+               wtt.logger.Error(nil, "job is nil, cannot execute")
+               return fmt.Errorf("job is nil, cannot execute")
+       }
+
+       wtt.logger.Info("executing wheel timer task",
+               "jobID", wtt.job.ID,
+               "monitorID", wtt.job.MonitorID,
+               "app", wtt.job.App)
+
+       startTime := time.Now()
+
+       // Dispatch metrics task through common dispatcher
+       // This is where the job gets broken down into individual metric 
collection tasks
+       err := wtt.commonDispatcher.DispatchMetricsTask(wtt.job, timeout)
+
+       duration := time.Since(startTime)
+
+       if err != nil {
+               wtt.logger.Error(err, "failed to dispatch metrics task",
+                       "jobID", wtt.job.ID,
+                       "duration", duration)
+               return err
+       }
+
+       wtt.logger.Info("successfully dispatched metrics task",
+               "jobID", wtt.job.ID,
+               "duration", duration)
+
+       // If this is a cyclic job, reschedule it for the next execution
+       if wtt.job.IsCyclic && wtt.job.DefaultInterval > 0 {
+               wtt.rescheduleJob(timeout)
+       }
+
+       return nil
+}
+
+// rescheduleJob reschedules a cyclic job for the next execution
+func (wtt *WheelTimerTask) rescheduleJob(timeout *jobtypes.Timeout) {
+       wtt.logger.Info("rescheduling cyclic job",
+               "jobID", wtt.job.ID,
+               "interval", wtt.job.DefaultInterval)
+
+       // TODO: Implement rescheduling logic
+       // This would involve creating a new timeout with the same job
+       // and adding it back to the time wheel
+       // For now, we'll log that rescheduling is needed
+       wtt.logger.Info("cyclic job needs rescheduling",
+               "jobID", wtt.job.ID,
+               "nextExecutionIn", 
time.Duration(wtt.job.DefaultInterval)*time.Second)
+}
+
+// GetJob returns the associated job
+func (wtt *WheelTimerTask) GetJob() *jobtypes.Job {
+       return wtt.job
+}
diff --git a/internal/collector/common/job/job_server.go 
b/internal/collector/common/job/job_server.go
index fdd7830..31eed6f 100644
--- a/internal/collector/common/job/job_server.go
+++ b/internal/collector/common/job/job_server.go
@@ -17,51 +17,191 @@
  * under the License.
  */
 
-package transport
+package job
 
 import (
        "context"
+       "fmt"
+       "sync"
 
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/dispatch"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/dispatcher"
        clrServer 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
 )
 
+// TimeDispatcher interface defines time-based job scheduling
+type TimeDispatcher interface {
+       AddJob(job *jobtypes.Job) error
+       RemoveJob(jobID int64) error
+       Start(ctx context.Context) error
+       Stop() error
+}
+
+// Config represents job service configuration
 type Config struct {
        clrServer.Server
 }
 
+// Runner implements the service runner interface
 type Runner struct {
        Config
+       timeDispatch TimeDispatcher
+       mu           sync.RWMutex
+       runningJobs  map[int64]*jobtypes.Job
+       ctx          context.Context
+       cancel       context.CancelFunc
+}
+
+// AddAsyncCollectJob adds a job to async collection scheduling
+func (r *Runner) AddAsyncCollectJob(job *jobtypes.Job) error {
+       if job == nil {
+               r.Logger.Error(nil, "job cannot be nil")
+               return fmt.Errorf("job cannot be nil")
+       }
+
+       r.Logger.Info("adding async collect job",
+               "jobID", job.ID,
+               "monitorID", job.MonitorID,
+               "app", job.App,
+               "isCyclic", job.IsCyclic)
+
+       r.mu.Lock()
+       defer r.mu.Unlock()
+
+       // Store job in running jobs map
+       r.runningJobs[job.ID] = job
+
+       // Add job to time dispatcher for scheduling
+       if err := r.timeDispatch.AddJob(job); err != nil {
+               delete(r.runningJobs, job.ID)
+               r.Logger.Error(err, "failed to add job to time dispatcher", 
"jobID", job.ID)
+               return fmt.Errorf("failed to add job to time dispatcher: %w", 
err)
+       }
+
+       r.Logger.Info("successfully added job to scheduler", "jobID", job.ID)
+       return nil
+}
+
+// RemoveAsyncCollectJob removes a job from scheduling
+func (r *Runner) RemoveAsyncCollectJob(jobID int64) error {
+       r.Logger.Info("removing async collect job", "jobID", jobID)
+
+       r.mu.Lock()
+       defer r.mu.Unlock()
+
+       // Remove from running jobs
+       delete(r.runningJobs, jobID)
+
+       // Remove from time dispatcher
+       if err := r.timeDispatch.RemoveJob(jobID); err != nil {
+               r.Logger.Error(err, "failed to remove job from time 
dispatcher", "jobID", jobID)
+               return fmt.Errorf("failed to remove job from time dispatcher: 
%w", err)
+       }
+
+       r.Logger.Info("successfully removed job from scheduler", "jobID", jobID)
+       return nil
+}
+
+// RunningJobs returns a copy of currently running jobs
+func (r *Runner) RunningJobs() map[int64]*jobtypes.Job {
+       r.mu.RLock()
+       defer r.mu.RUnlock()
+
+       result := make(map[int64]*jobtypes.Job)
+       for id, job := range r.runningJobs {
+               result[id] = job
+       }
+       return result
 }
 
+// New creates a new job service runner with all components initialized
 func New(srv *Config) *Runner {
+       ctx, cancel := context.WithCancel(context.Background())
 
-       return &Runner{
-               Config: *srv,
+       // Create result handler
+       resultHandler := collect.NewResultHandler(srv.Logger)
+
+       // Create metrics collector
+       metricsCollector := dispatch.NewMetricsCollector(srv.Logger)
+
+       // Create common dispatcher
+       commonDispatcher := dispatcher.NewCommonDispatcher(srv.Logger, 
metricsCollector, resultHandler)
+
+       // Create time dispatcher
+       timeDispatch := dispatcher.NewTimeDispatch(srv.Logger, commonDispatcher)
+
+       runner := &Runner{
+               Config:       *srv,
+               timeDispatch: timeDispatch,
+               runningJobs:  make(map[int64]*jobtypes.Job),
+               ctx:          ctx,
+               cancel:       cancel,
        }
+
+       return runner
 }
 
+// Start starts the job service runner
 func (r *Runner) Start(ctx context.Context) error {
-
        r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", 
r.Info().Name)
+       r.Logger.Info("Starting job service runner")
+
+       // Start the time dispatcher
+       if r.timeDispatch != nil {
+               if err := r.timeDispatch.Start(ctx); err != nil {
+                       r.Logger.Error(err, "failed to start time dispatcher")
+                       return fmt.Errorf("failed to start time dispatcher: 
%w", err)
+               }
+       } else {
+               return fmt.Errorf("time dispatcher is not initialized")
+       }
 
-       r.Logger.Info("Starting job server")
+       r.Logger.Info("job service runner started successfully")
 
        select {
        case <-ctx.Done():
+               r.Logger.Info("job service runner stopped by context")
+               if r.timeDispatch != nil {
+                       if err := r.timeDispatch.Stop(); err != nil {
+                               r.Logger.Error(err, "error stopping time 
dispatcher")
+                       }
+               }
                return nil
        }
 }
 
+// Info returns runner information
 func (r *Runner) Info() collector.Info {
-
        return collector.Info{
-               Name: "job",
+               Name: "job-service",
        }
 }
 
+// Close closes the job service runner and all its components
 func (r *Runner) Close() error {
+       r.Logger.Info("closing job service runner")
+
+       r.cancel()
+
+       // Stop time dispatcher
+       if r.timeDispatch != nil {
+               if err := r.timeDispatch.Stop(); err != nil {
+                       r.Logger.Error(err, "error stopping time dispatcher")
+                       // Continue cleanup despite error
+               }
+       }
+
+       // Stop common dispatcher (via time dispatcher's commonDispatcher field)
+       // This is handled internally by the time dispatcher's Stop method
+
+       // Clear running jobs
+       r.mu.Lock()
+       r.runningJobs = make(map[int64]*jobtypes.Job)
+       r.mu.Unlock()
 
-       r.Logger.Info("job close...")
+       r.Logger.Info("job service runner closed")
        return nil
 }
diff --git a/internal/collector/common/types/job/job_types.go 
b/internal/collector/common/types/job/job_types.go
index 9f1df89..28478b8 100644
--- a/internal/collector/common/types/job/job_types.go
+++ b/internal/collector/common/types/job/job_types.go
@@ -50,10 +50,10 @@ type Job struct {
        ResponseDataTemp []MetricsData        `json:"-"`
 }
 
-// GetNextCollectMetrics returns the metrics that should be collected next
+// NextCollectMetrics returns the metrics that should be collected next
 // This is a simplified version - in the full implementation this would handle
 // metric priorities, dependencies, and collection levels
-func (j *Job) GetNextCollectMetrics() []*Metrics {
+func (j *Job) NextCollectMetrics() []*Metrics {
        result := make([]*Metrics, 0, len(j.Metrics))
        for i := range j.Metrics {
                result = append(result, &j.Metrics[i])
diff --git a/internal/collector/common/types/job/timeout_types.go 
b/internal/collector/common/types/job/timeout_types.go
index 82cd520..ab93c5a 100644
--- a/internal/collector/common/types/job/timeout_types.go
+++ b/internal/collector/common/types/job/timeout_types.go
@@ -79,22 +79,22 @@ func (t *Timeout) Context() context.Context {
        return t.ctx
 }
 
-// SetWheelIndex sets the wheel index for internal use
+// SetWheelIndex sets the wheel index for this timeout
 func (t *Timeout) SetWheelIndex(index int) {
        t.wheelIndex = index
 }
 
-// GetWheelIndex gets the wheel index
-func (t *Timeout) GetWheelIndex() int {
-       return t.wheelIndex
-}
-
-// SetBucketIndex sets the bucket index for internal use
+// SetBucketIndex sets the bucket index for this timeout
 func (t *Timeout) SetBucketIndex(index int) {
        t.bucketIndex = index
 }
 
-// GetBucketIndex gets the bucket index
-func (t *Timeout) GetBucketIndex() int {
+// WheelIndex returns the wheel index
+func (t *Timeout) WheelIndex() int {
+       return t.wheelIndex
+}
+
+// BucketIndex returns the bucket index
+func (t *Timeout) BucketIndex() int {
        return t.bucketIndex
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to