This is an automated email from the ASF dual-hosted git repository.
zhaoqingran pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to refs/heads/main by this push:
new 47ef6c3 Reconfiguration scheduling (#11)
47ef6c3 is described below
commit 47ef6c331eb9c84f7dd2a20d31623eeb6822a9a5
Author: 铁甲小宝 <[email protected]>
AuthorDate: Mon Sep 8 21:40:08 2025 +0800
Reconfiguration scheduling (#11)
* add:The first version of the modified schedule
* add:All protocols are initialized using the init function.
---------
Co-authored-by: Logic <[email protected]>
---
examples/main_simulation.go | 306 ---------------------
internal/cmd/server.go | 6 +
internal/collector/basic/abstract_collect.go | 119 --------
.../collector/basic/database/jdbc_collector.go | 66 +++--
internal/collector/basic/init.go | 56 ++++
internal/collector/basic/registry/.keep | 0
.../common/collect/dispatch/metrics_collector.go | 141 ++++++++++
.../collector/common/collect/result_handler.go | 77 ++++++
.../common/collect/strategy/strategy_factory.go | 119 ++++++--
.../common/dispatcher/common_dispatcher.go | 244 ++++++++++++++++
.../common/dispatcher/hashed_wheel_timer.go | 226 +++++++++++++++
internal/collector/common/dispatcher/time_wheel.go | 223 +++++++++++++++
.../common/dispatcher/wheel_timer_task.go | 105 +++++++
internal/collector/common/job/job_server.go | 156 ++++++++++-
internal/collector/common/types/job/job_types.go | 4 +-
.../collector/common/types/job/timeout_types.go | 18 +-
16 files changed, 1383 insertions(+), 483 deletions(-)
diff --git a/examples/main_simulation.go b/examples/main_simulation.go
deleted file mode 100644
index a87665d..0000000
--- a/examples/main_simulation.go
+++ /dev/null
@@ -1,306 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package main
-
-//import (
-// "context"
-// "flag"
-// "fmt"
-// "os"
-// "os/signal"
-// "syscall"
-// "time"
-//
-// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector"
-//
"hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/dispatcher/entrance"
-// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger"
-// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types"
-// jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job"
-//)
-//
-//// go build -ldflags "-X main.Version=x.y.z"
-//var (
-// ConfPath string
-// Version string
-// Simulation bool // 新增:是否启用模拟模式
-//)
-//
-//func init() {
-// flag.StringVar(&ConfPath, "conf", "hertzbeat-collector.yaml", "path to
config file")
-// flag.BoolVar(&Simulation, "simulation", true, "enable manager task
simulation mode")
-//}
-//
-//func main() {
-// flag.Parse()
-//
-// // 初始化日志
-// log := logger.DefaultLogger(os.Stdout, types.LogLevelInfo)
-// log.Info("🚀 启动HertzBeat Collector", "version", Version, "simulation",
Simulation)
-// Simulation = true
-// if Simulation {
-// // 模拟模式:启动完整的CollectServer并模拟Manager发送任务
-// runSimulationMode(log)
-// } else {
-// // 正常模式:启动标准的collector服务
-// if err := collector.Bootstrap(ConfPath, Version); err != nil {
-// log.Error(err, "❌ 启动collector失败")
-// os.Exit(1)
-// }
-// }
-//}
-//
-//// runSimulationMode 运行模拟Manager发送任务的模式
-//func runSimulationMode(log logger.Logger) {
-// log.Info("🎭 启动模拟模式:模拟Manager发送JDBC采集任务")
-//
-// // 1. 创建CollectServer (完整的采集器架构)
-// config := entrance.DefaultServerConfig()
-// collectServer, err := entrance.NewCollectServer(config, log)
-// if err != nil {
-// log.Error(err, "❌ 创建CollectServer失败")
-// return
-// }
-//
-// // 2. 启动CollectServer
-// if err := collectServer.Start(); err != nil {
-// log.Error(err, "❌ 启动CollectServer失败")
-// return
-// }
-// defer collectServer.Stop()
-//
-// log.Info("✅ CollectServer已启动,准备接收Manager任务")
-//
-// // 3. 创建模拟的Manager任务
-// jdbcJob := createManagerJDBCTask()
-//
-// // 4. 启动任务模拟器
-// ctx, cancel := context.WithCancel(context.Background())
-// defer cancel()
-//
-// // 启动模拟Manager发送任务的goroutine
-// go simulateManagerTasks(ctx, collectServer, jdbcJob, log)
-//
-// // 5. 等待中断信号
-// log.Info("🔄 模拟器运行中 (每60秒模拟Manager发送任务),按Ctrl+C停止...")
-// sigCh := make(chan os.Signal, 1)
-// signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
-// <-sigCh
-//
-// log.Info("📴 收到停止信号,正在关闭...")
-// cancel()
-// time.Sleep(2 * time.Second)
-// log.Info("👋 模拟器已停止")
-//}
-//
-//// createManagerJDBCTask 创建模拟从Manager接收的JDBC采集任务
-//func createManagerJDBCTask() *jobtypes.Job {
-// return &jobtypes.Job{
-// ID: 1001,
-// TenantID: 1,
-// MonitorID: 2001,
-// App: "mysql",
-// Category: "database",
-// IsCyclic: true,
-// DefaultInterval: 30, // 30秒间隔
-// Timestamp: time.Now().Unix(),
-//
-// // Manager发送的任务元数据
-// Metadata: map[string]string{
-// "instancename": "生产MySQL实例",
-// "instancehost": "localhost:43306",
-// "manager_node": "hertzbeat-manager-001",
-// "task_source": "manager_scheduler",
-// "collector_assigned": "auto",
-// },
-//
-// Labels: map[string]string{
-// "env": "production",
-// "database": "mysql",
-// "region": "localhost",
-// "managed_by": "hertzbeat",
-// "priority": "high",
-// },
-//
-// Annotations: map[string]string{
-// "description": "MySQL数据库性能监控",
-// "created_by": "manager-scheduler",
-// "task_type": "cyclic_monitoring",
-// "alert_enabled": "true",
-// },
-//
-// // JDBC采集指标配置
-// Metrics: []jobtypes.Metrics{
-// {
-// Name: "mysql_basic_info",
-// Priority: 0,
-// Protocol: "jdbc",
-// Host: "localhost",
-// Port: "3306",
-// Timeout: "15s",
-// Interval: 30,
-// Fields: []jobtypes.Field{
-// {Field: "database_name", Type: 1,
Label: true},
-// {Field: "version", Type: 1, Label:
false},
-// {Field: "uptime_seconds", Type: 0,
Label: false},
-// {Field: "server_id", Type: 0, Label:
false},
-// },
-// JDBC: &jobtypes.JDBCProtocol{
-// Host: "localhost",
-// Port: "3306",
-// Platform: "mysql",
-// Username: "root",
-// Password: "password", // 请修改为实际密码
-// Database: "mysql",
-// Timeout: "15",
-// QueryType: "oneRow",
-// SQL: `SELECT
-// DATABASE() as database_name,
-// VERSION() as version,
-// (SELECT VARIABLE_VALUE FROM
INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME='Uptime') as
uptime_seconds,
-// @@server_id as server_id`,
-// },
-// },
-// },
-// }
-//}
-//
-//// simulateManagerTasks 模拟Manager定期发送采集任务
-//func simulateManagerTasks(ctx context.Context, collectServer
*entrance.CollectServer, job *jobtypes.Job, log logger.Logger) {
-// log.Info("📡 开始模拟Manager任务调度", "jobId", job.ID, "interval", "60s")
-//
-// // 创建任务响应监听器 (模拟发送结果回Manager)
-// eventListener := &ManagerResponseSimulator{
-// logger: log,
-// jobID: job.ID,
-// monitorID: job.MonitorID,
-// }
-//
-// // 立即发送第一个任务 (模拟Manager初始调度)
-// log.Info("📤 Manager发送初始任务", "timestamp", time.Now().Format("15:04:05"))
-// if err := collectServer.ReceiveJob(job, eventListener); err != nil {
-// log.Error(err, "❌ 接收初始任务失败")
-// return
-// }
-//
-// // 模拟Manager定期发送任务更新/重新调度
-// ticker := time.NewTicker(60 * time.Second) // 每60秒模拟一次Manager调度
-// defer ticker.Stop()
-//
-// taskSequence := 1
-//
-// for {
-// select {
-// case <-ticker.C:
-// taskSequence++
-//
-// // 创建任务更新 (模拟Manager重新调度或配置更新)
-// updatedJob := *job
-// updatedJob.Timestamp = time.Now().Unix()
-// updatedJob.ID = job.ID + int64(taskSequence)
-//
-// // 更新任务元数据 (模拟Manager的管理信息)
-// updatedJob.Metadata["task_sequence"] =
fmt.Sprintf("%d", taskSequence)
-// updatedJob.Metadata["last_scheduled"] =
time.Now().Format(time.RFC3339)
-// updatedJob.Metadata["scheduler_version"] = "v1.0.0"
-//
-// log.Info("📤 Manager发送任务更新",
-// "sequence", taskSequence,
-// "jobId", updatedJob.ID,
-// "timestamp", time.Now().Format("15:04:05"))
-//
-// // 发送更新任务到Collector
-// if err := collectServer.ReceiveJob(&updatedJob,
eventListener); err != nil {
-// log.Error(err, "❌ 接收任务更新失败", "sequence",
taskSequence)
-// }
-//
-// case <-ctx.Done():
-// log.Info("🛑 停止模拟Manager任务调度")
-// return
-// }
-// }
-//}
-//
-//// ManagerResponseSimulator 模拟Manager接收采集结果的响应处理器
-//type ManagerResponseSimulator struct {
-// logger logger.Logger
-// jobID int64
-// monitorID int64
-//}
-//
-//// Response 处理采集结果 (模拟发送给Manager的过程)
-//func (mrs *ManagerResponseSimulator) Response(metricsData []interface{}) {
-// mrs.logger.Info("📨 Collector采集完成,准备发送结果到Manager",
-// "jobId", mrs.jobID,
-// "monitorId", mrs.monitorID,
-// "metricsCount", len(metricsData))
-//
-// // 模拟处理每个采集指标的结果
-// for i, data := range metricsData {
-// if collectData, ok := data.(*jobtypes.CollectRepMetricsData);
ok {
-// mrs.logger.Info("📊 处理采集指标",
-// "metricIndex", i+1,
-// "metricName", collectData.Metrics,
-// "code", collectData.Code,
-// "time", collectData.Time)
-//
-// if collectData.Code == 200 {
-// mrs.logger.Info("✅ 指标采集成功",
-// "metric", collectData.Metrics,
-// "fieldsCount", len(collectData.Fields),
-// "valuesCount", len(collectData.Values))
-//
-// // 打印采集数据 (模拟发送到Manager的数据)
-// for rowIndex, valueRow := range
collectData.Values {
-// mrs.logger.Info("📈 采集数据",
-// "metric", collectData.Metrics,
-// "row", rowIndex+1,
-// "values", valueRow.Columns)
-// }
-// } else {
-// mrs.logger.Error(fmt.Errorf("采集失败"), "❌ 指标采集错误",
-// "metric", collectData.Metrics,
-// "code", collectData.Code,
-// "message", collectData.Msg)
-// }
-// }
-// }
-//
-// // 模拟发送HTTP响应到Manager
-// mrs.simulateHTTPResponseToManager(metricsData)
-//}
-//
-//// simulateHTTPResponseToManager 模拟通过HTTP将采集结果发送回Manager
-//func (mrs *ManagerResponseSimulator)
simulateHTTPResponseToManager(metricsData []interface{}) {
-// // 模拟HTTP请求参数
-// managerEndpoint := "http://hertzbeat-manager:1157/api/collector/collect"
-// collectorIdentity := "collector-go-001"
-//
-// mrs.logger.Info("🚀 模拟HTTP请求发送采集结果",
-// "endpoint", managerEndpoint,
-// "collectorId", collectorIdentity,
-// "jobId", mrs.jobID,
-// "dataCount", len(metricsData),
-// "timestamp", time.Now().Format("15:04:05"))
-//
-// // 这里可以添加真实的HTTP客户端代码
-// // httpClient.Post(managerEndpoint, jsonData)
-//
-// mrs.logger.Info("📤 采集结果已发送到Manager (模拟成功)",
-// "status", "200 OK",
-// "responseTime", "15ms")
-//}
diff --git a/internal/cmd/server.go b/internal/cmd/server.go
index 3b88e5b..81a8bfe 100644
--- a/internal/cmd/server.go
+++ b/internal/cmd/server.go
@@ -30,6 +30,7 @@ import (
"github.com/spf13/cobra"
bannerouter
"hertzbeat.apache.org/hertzbeat-collector-go/internal/banner"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
jobserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job"
clrServer
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
transportserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport"
@@ -109,6 +110,11 @@ func server(ctx context.Context, logOut io.Writer) error {
func startRunners(ctx context.Context, cfg *clrServer.Server) error {
+ // Initialize all collectors before starting runners
+ // This ensures all protocol collectors are registered and ready
+ cfg.Logger.Info("Initializing collectors...")
+ basic.InitializeAllCollectors(cfg.Logger)
+
runners := []struct {
runner Runner[collectortypes.Info]
}{
diff --git a/internal/collector/basic/abstract_collect.go
b/internal/collector/basic/abstract_collect.go
deleted file mode 100644
index c67e618..0000000
--- a/internal/collector/basic/abstract_collect.go
+++ /dev/null
@@ -1,119 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package basic
-
-import (
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/timer"
-)
-
-// AbstractCollector defines the interface for all collectors
-type AbstractCollector interface {
- // PreCheck validates metrics configuration
- PreCheck(metrics *jobtypes.Metrics) error
-
- // Collect performs the actual metrics collection
- Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRepMetricsData
-
- // SupportProtocol returns the protocol this collector supports
- SupportProtocol() string
-}
-
-// BaseCollector provides common functionality for all collectors
-type BaseCollector struct {
- Logger logger.Logger
-}
-
-// NewBaseCollector creates a new base collector
-func NewBaseCollector(logger logger.Logger) *BaseCollector {
- return &BaseCollector{
- Logger: logger.WithName("base-collector"),
- }
-}
-
-// CreateSuccessResponse creates a successful metrics data response
-func (bc *BaseCollector) CreateSuccessResponse(metrics *jobtypes.Metrics)
*jobtypes.CollectRepMetricsData {
- return &jobtypes.CollectRepMetricsData{
- ID: 0, // Will be set by the calling context
- MonitorID: 0, // Will be set by the calling context
- App: "", // Will be set by the calling context
- Metrics: metrics.Name,
- Priority: 0,
- Time: timer.GetCurrentTimeMillis(),
- Code: 200, // Success
- Msg: "success",
- Fields: make([]jobtypes.Field, 0),
- Values: make([]jobtypes.ValueRow, 0),
- Labels: make(map[string]string),
- Metadata: make(map[string]string),
- }
-}
-
-// CreateFailResponse creates a failed metrics data response
-func (bc *BaseCollector) CreateFailResponse(metrics *jobtypes.Metrics, code
int, message string) *jobtypes.CollectRepMetricsData {
- return &jobtypes.CollectRepMetricsData{
- ID: 0, // Will be set by the calling context
- MonitorID: 0, // Will be set by the calling context
- App: "", // Will be set by the calling context
- Metrics: metrics.Name,
- Priority: 0,
- Time: timer.GetCurrentTimeMillis(),
- Code: code,
- Msg: message,
- Fields: make([]jobtypes.Field, 0),
- Values: make([]jobtypes.ValueRow, 0),
- Labels: make(map[string]string),
- Metadata: make(map[string]string),
- }
-}
-
-// CollectorRegistry manages registered collectors
-type CollectorRegistry struct {
- collectors map[string]AbstractCollector
- logger logger.Logger
-}
-
-// NewCollectorRegistry creates a new collector registry
-func NewCollectorRegistry(logger logger.Logger) *CollectorRegistry {
- return &CollectorRegistry{
- collectors: make(map[string]AbstractCollector),
- logger: logger.WithName("collector-registry"),
- }
-}
-
-// Register registers a collector for a specific protocol
-func (cr *CollectorRegistry) Register(protocol string, collector
AbstractCollector) {
- cr.collectors[protocol] = collector
- cr.logger.Info("registered collector", "protocol", protocol)
-}
-
-// GetCollector gets a collector for the specified protocol
-func (cr *CollectorRegistry) GetCollector(protocol string) (AbstractCollector,
bool) {
- collector, exists := cr.collectors[protocol]
- return collector, exists
-}
-
-// GetSupportedProtocols returns all supported protocols
-func (cr *CollectorRegistry) GetSupportedProtocols() []string {
- protocols := make([]string, 0, len(cr.collectors))
- for protocol := range cr.collectors {
- protocols = append(protocols, protocol)
- }
- return protocols
-}
diff --git a/internal/collector/basic/database/jdbc_collector.go
b/internal/collector/basic/database/jdbc_collector.go
index 361ccc9..01fa7f2 100644
--- a/internal/collector/basic/database/jdbc_collector.go
+++ b/internal/collector/basic/database/jdbc_collector.go
@@ -30,8 +30,6 @@ import (
_ "github.com/microsoft/go-mssqldb"
jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
-
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
)
const (
@@ -59,13 +57,13 @@ const (
// JDBCCollector implements JDBC database collection
type JDBCCollector struct {
- *basic.BaseCollector
+ logger logger.Logger
}
// NewJDBCCollector creates a new JDBC collector
func NewJDBCCollector(logger logger.Logger) *JDBCCollector {
return &JDBCCollector{
- BaseCollector:
basic.NewBaseCollector(logger.WithName("jdbc-collector")),
+ logger: logger.WithName("jdbc-collector"),
}
}
@@ -127,7 +125,7 @@ func (jc *JDBCCollector) Collect(metrics *jobtypes.Metrics)
*jobtypes.CollectRep
startTime := time.Now()
jdbc := metrics.JDBC
- jc.Logger.Info("starting JDBC collection",
+ jc.logger.Info("starting JDBC collection",
"host", jdbc.Host,
"port", jdbc.Port,
"platform", jdbc.Platform,
@@ -140,20 +138,20 @@ func (jc *JDBCCollector) Collect(metrics
*jobtypes.Metrics) *jobtypes.CollectRep
// Get database URL
databaseURL, err := jc.constructDatabaseURL(jdbc)
if err != nil {
- jc.Logger.Error(err, "failed to construct database URL")
- return jc.CreateFailResponse(metrics, CodeFail,
fmt.Sprintf("Database URL error: %v", err))
+ jc.logger.Error(err, "failed to construct database URL")
+ return jc.createFailResponse(metrics, CodeFail,
fmt.Sprintf("Database URL error: %v", err))
}
// Create database connection
db, err := jc.getConnection(databaseURL, jdbc.Username, jdbc.Password,
timeout)
if err != nil {
- jc.Logger.Error(err, "failed to connect to database")
- return jc.CreateFailResponse(metrics, CodeUnConnectable,
fmt.Sprintf("Connection error: %v", err))
+ jc.logger.Error(err, "failed to connect to database")
+ return jc.createFailResponse(metrics, CodeUnConnectable,
fmt.Sprintf("Connection error: %v", err))
}
defer db.Close()
// Execute query based on type with context
- response := jc.CreateSuccessResponse(metrics)
+ response := jc.createSuccessResponse(metrics)
switch jdbc.QueryType {
case QueryTypeOneRow:
@@ -169,20 +167,20 @@ func (jc *JDBCCollector) Collect(metrics
*jobtypes.Metrics) *jobtypes.CollectRep
}
if err != nil {
- jc.Logger.Error(err, "query execution failed", "queryType",
jdbc.QueryType)
- return jc.CreateFailResponse(metrics, CodeFail,
fmt.Sprintf("Query error: %v", err))
+ jc.logger.Error(err, "query execution failed", "queryType",
jdbc.QueryType)
+ return jc.createFailResponse(metrics, CodeFail,
fmt.Sprintf("Query error: %v", err))
}
duration := time.Since(startTime)
- jc.Logger.Info("JDBC collection completed",
+ jc.logger.Info("JDBC collection completed",
"duration", duration,
"rowCount", len(response.Values))
return response
}
-// SupportProtocol returns the protocol this collector supports
-func (jc *JDBCCollector) SupportProtocol() string {
+// Protocol returns the protocol this collector supports
+func (jc *JDBCCollector) Protocol() string {
return ProtocolJDBC
}
@@ -467,6 +465,42 @@ func (jc *JDBCCollector) queryColumns(db *sql.DB, sqlQuery
string, aliasFields [
// runScript executes a SQL script (placeholder implementation)
func (jc *JDBCCollector) runScript(db *sql.DB, scriptPath string, response
*jobtypes.CollectRepMetricsData) error {
// For security reasons, we'll just return success without actually
running scripts
- jc.Logger.Info("script execution requested but disabled for security",
"scriptPath", scriptPath)
+ jc.logger.Info("script execution requested but disabled for security",
"scriptPath", scriptPath)
return nil
}
+
+// createSuccessResponse creates a successful metrics data response
+func (jc *JDBCCollector) createSuccessResponse(metrics *jobtypes.Metrics)
*jobtypes.CollectRepMetricsData {
+ return &jobtypes.CollectRepMetricsData{
+ ID: 0, // Will be set by the calling context
+ MonitorID: 0, // Will be set by the calling context
+ App: "", // Will be set by the calling context
+ Metrics: metrics.Name,
+ Priority: 0,
+ Time: time.Now().UnixMilli(),
+ Code: 200, // Success
+ Msg: "success",
+ Fields: make([]jobtypes.Field, 0),
+ Values: make([]jobtypes.ValueRow, 0),
+ Labels: make(map[string]string),
+ Metadata: make(map[string]string),
+ }
+}
+
+// createFailResponse creates a failed metrics data response
+func (jc *JDBCCollector) createFailResponse(metrics *jobtypes.Metrics, code
int, message string) *jobtypes.CollectRepMetricsData {
+ return &jobtypes.CollectRepMetricsData{
+ ID: 0, // Will be set by the calling context
+ MonitorID: 0, // Will be set by the calling context
+ App: "", // Will be set by the calling context
+ Metrics: metrics.Name,
+ Priority: 0,
+ Time: time.Now().UnixMilli(),
+ Code: code,
+ Msg: message,
+ Fields: make([]jobtypes.Field, 0),
+ Values: make([]jobtypes.ValueRow, 0),
+ Labels: make(map[string]string),
+ Metadata: make(map[string]string),
+ }
+}
diff --git a/internal/collector/basic/init.go b/internal/collector/basic/init.go
new file mode 100644
index 0000000..74c4aa6
--- /dev/null
+++ b/internal/collector/basic/init.go
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package basic
+
+import (
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/database"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// init 函数在包被导入时自动执行
+// 集中注册所有协议的工厂函数
+func init() {
+ // 注册所有协议的工厂函数
+ // 新增协议时只需要在这里添加一行
+ strategy.RegisterFactory("jdbc", func(logger logger.Logger)
strategy.Collector {
+ return database.NewJDBCCollector(logger)
+ })
+
+ // 未来可以在这里添加更多协议:
+ // strategy.RegisterFactory("http", func(logger logger.Logger)
strategy.Collector {
+ // return http.NewHTTPCollector(logger)
+ // })
+ // strategy.RegisterFactory("redis", func(logger logger.Logger)
strategy.Collector {
+ // return redis.NewRedisCollector(logger)
+ // })
+}
+
+// InitializeAllCollectors 初始化所有已注册的采集器
+// 此时init()函数已经注册了所有工厂函数
+// 这个函数创建实际的采集器实例
+func InitializeAllCollectors(logger logger.Logger) {
+ logger.Info("initializing all collectors")
+
+ // 使用已注册的工厂函数创建采集器实例
+ strategy.InitializeCollectors(logger)
+
+ logger.Info("all collectors initialized successfully")
+}
diff --git a/internal/collector/basic/registry/.keep
b/internal/collector/basic/registry/.keep
deleted file mode 100644
index e69de29..0000000
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go
b/internal/collector/common/collect/dispatch/metrics_collector.go
new file mode 100644
index 0000000..b569858
--- /dev/null
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package dispatch
+
+import (
+ "fmt"
+ "time"
+
+ _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// MetricsCollector handles metrics collection using goroutines and channels
+type MetricsCollector struct {
+ logger logger.Logger
+}
+
+// NewMetricsCollector creates a new metrics collector
+func NewMetricsCollector(logger logger.Logger) *MetricsCollector {
+ return &MetricsCollector{
+ logger: logger.WithName("metrics-collector"),
+ }
+}
+
+// CollectMetrics collects metrics using the appropriate collector and returns
results via channel
+// This method uses Go's channel-based concurrency instead of Java's thread
pools
+func (mc *MetricsCollector) CollectMetrics(metrics *jobtypes.Metrics, job
*jobtypes.Job, timeout *jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData {
+ resultChan := make(chan *jobtypes.CollectRepMetricsData, 1)
+
+ // Start collection in a goroutine (Go's lightweight thread)
+ go func() {
+ defer close(resultChan)
+
+ mc.logger.Info("starting metrics collection",
+ "jobID", job.ID,
+ "metricsName", metrics.Name,
+ "protocol", metrics.Protocol)
+
+ startTime := time.Now()
+
+ // Get the appropriate collector based on protocol
+ collector, err := strategy.CollectorFor(metrics.Protocol)
+ if err != nil {
+ mc.logger.Error(err, "failed to get collector",
+ "protocol", metrics.Protocol,
+ "metricsName", metrics.Name)
+
+ result := mc.createErrorResponse(metrics, job, 500,
fmt.Sprintf("Collector not found: %v", err))
+ resultChan <- result
+ return
+ }
+
+ // Perform the actual collection
+ result := collector.Collect(metrics)
+
+ // Enrich result with job information
+ if result != nil {
+ result.ID = job.ID
+ result.MonitorID = job.MonitorID
+ result.App = job.App
+ result.TenantID = job.TenantID
+
+ // Add labels from job
+ if result.Labels == nil {
+ result.Labels = make(map[string]string)
+ }
+ for k, v := range job.Labels {
+ result.Labels[k] = v
+ }
+
+ // Add metadata from job
+ if result.Metadata == nil {
+ result.Metadata = make(map[string]string)
+ }
+ for k, v := range job.Metadata {
+ result.Metadata[k] = v
+ }
+ }
+
+ duration := time.Since(startTime)
+
+ if result != nil && result.Code == 200 {
+ mc.logger.Info("metrics collection completed
successfully",
+ "jobID", job.ID,
+ "metricsName", metrics.Name,
+ "protocol", metrics.Protocol,
+ "duration", duration,
+ "valuesCount", len(result.Values))
+ } else {
+ mc.logger.Info("metrics collection failed",
+ "jobID", job.ID,
+ "metricsName", metrics.Name,
+ "protocol", metrics.Protocol,
+ "duration", duration,
+ "code", result.Code,
+ "message", result.Msg)
+ }
+
+ resultChan <- result
+ }()
+
+ return resultChan
+}
+
+// createErrorResponse creates an error response for failed collections
+func (mc *MetricsCollector) createErrorResponse(metrics *jobtypes.Metrics, job
*jobtypes.Job, code int, message string) *jobtypes.CollectRepMetricsData {
+ return &jobtypes.CollectRepMetricsData{
+ ID: job.ID,
+ MonitorID: job.MonitorID,
+ TenantID: job.TenantID,
+ App: job.App,
+ Metrics: metrics.Name,
+ Priority: metrics.Priority,
+ Time: time.Now().UnixMilli(),
+ Code: code,
+ Msg: message,
+ Fields: make([]jobtypes.Field, 0),
+ Values: make([]jobtypes.ValueRow, 0),
+ Labels: make(map[string]string),
+ Metadata: make(map[string]string),
+ }
+}
diff --git a/internal/collector/common/collect/result_handler.go
b/internal/collector/common/collect/result_handler.go
new file mode 100644
index 0000000..701ca63
--- /dev/null
+++ b/internal/collector/common/collect/result_handler.go
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package collect
+
+import (
+ "fmt"
+
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// ResultHandlerImpl implements the ResultHandler interface
+type ResultHandlerImpl struct {
+ logger logger.Logger
+ // TODO: Add data queue or storage interface when needed
+}
+
+// NewResultHandler creates a new result handler
+func NewResultHandler(logger logger.Logger) *ResultHandlerImpl {
+ return &ResultHandlerImpl{
+ logger: logger.WithName("result-handler"),
+ }
+}
+
+// HandleCollectData processes the collection results
+func (rh *ResultHandlerImpl) HandleCollectData(data
*jobtypes.CollectRepMetricsData, job *jobtypes.Job) error {
+ if data == nil {
+ rh.logger.Error(nil, "collect data is nil")
+ return fmt.Errorf("collect data is nil")
+ }
+
+ rh.logger.Info("handling collect data",
+ "jobID", job.ID,
+ "monitorID", job.MonitorID,
+ "metricsName", data.Metrics,
+ "code", data.Code,
+ "valuesCount", len(data.Values))
+
+ // TODO: Implement actual data processing logic
+ // This could include:
+ // 1. Data validation and transformation
+ // 2. Sending to message queue
+ // 3. Storing to database
+ // 4. Triggering alerts based on thresholds
+ // 5. Updating monitoring status
+
+ if data.Code == 200 {
+ rh.logger.Info("successfully processed collect data",
+ "jobID", job.ID,
+ "metricsName", data.Metrics)
+ } else {
+ rh.logger.Info("received failed collect data",
+ "jobID", job.ID,
+ "metricsName", data.Metrics,
+ "code", data.Code,
+ "message", data.Msg)
+ }
+
+ return nil
+}
diff --git a/internal/collector/common/collect/strategy/strategy_factory.go
b/internal/collector/common/collect/strategy/strategy_factory.go
index 17e4874..fa951b0 100644
--- a/internal/collector/common/collect/strategy/strategy_factory.go
+++ b/internal/collector/common/collect/strategy/strategy_factory.go
@@ -18,39 +18,112 @@
package strategy
import (
+ "fmt"
"sync"
+
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
-// AbstractCollect interface
-type AbstractCollect interface {
- SupportProtocol() string
+// Collector interface defines the contract for all collectors
+type Collector interface {
+ // Collect performs the actual metrics collection
+ Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRepMetricsData
+
+ // Protocol returns the protocol this collector supports
+ Protocol() string
}
-// strategy container
-var (
- collectStrategy = make(map[string]AbstractCollect)
- mu sync.RWMutex
-)
+// CollectorFactory is a function that creates a collector with a logger
+type CollectorFactory func(logger.Logger) Collector
-// Register a collect strategy
-// Example: registration in init() of each implementation file
-//
-// func init() {
-// Register(&XXXCollect{})
-// }
-func Register(collect AbstractCollect) {
+// CollectorRegistry manages all registered collector factories
+type CollectorRegistry struct {
+ factories map[string]CollectorFactory
+ collectors map[string]Collector
+ mu sync.RWMutex
+ initialized bool
+}
- mu.Lock()
- defer mu.Unlock()
+// Global collector registry instance
+var globalRegistry = &CollectorRegistry{
+ factories: make(map[string]CollectorFactory),
+ collectors: make(map[string]Collector),
+}
- collectStrategy[collect.SupportProtocol()] = collect
+// RegisterFactory registers a collector factory function
+// This is called during init() to register collector factories
+func RegisterFactory(protocol string, factory CollectorFactory) {
+ globalRegistry.mu.Lock()
+ defer globalRegistry.mu.Unlock()
+
+ globalRegistry.factories[protocol] = factory
}
-// Invoke returns the collect strategy for a protocol
-func Invoke(protocol string) AbstractCollect {
+// InitializeCollectors creates actual collector instances using the
registered factories
+// This should be called once when logger is available
+func InitializeCollectors(logger logger.Logger) {
+ globalRegistry.mu.Lock()
+ defer globalRegistry.mu.Unlock()
+
+ if globalRegistry.initialized {
+ return // Already initialized
+ }
+
+ for protocol, factory := range globalRegistry.factories {
+ collector := factory(logger)
+ globalRegistry.collectors[protocol] = collector
+ }
+
+ globalRegistry.initialized = true
+
+ // Log initialization results
+ protocols := make([]string, 0, len(globalRegistry.collectors))
+ for protocol := range globalRegistry.collectors {
+ protocols = append(protocols, protocol)
+ }
+ logger.Info("collectors initialized from factories",
+ "protocols", protocols,
+ "count", len(protocols))
+}
+
+// Register a collector directly (legacy method, kept for compatibility)
+func Register(collector Collector) {
+ globalRegistry.mu.Lock()
+ defer globalRegistry.mu.Unlock()
+
+ protocol := collector.Protocol()
+ globalRegistry.collectors[protocol] = collector
+}
+
+// CollectorFor returns the collector for a protocol
+func CollectorFor(protocol string) (Collector, error) {
+ globalRegistry.mu.RLock()
+ defer globalRegistry.mu.RUnlock()
+
+ collector, exists := globalRegistry.collectors[protocol]
+ if !exists {
+ return nil, fmt.Errorf("no collector found for protocol: %s",
protocol)
+ }
+ return collector, nil
+}
+
+// SupportedProtocols returns all supported protocols
+func SupportedProtocols() []string {
+ globalRegistry.mu.RLock()
+ defer globalRegistry.mu.RUnlock()
+
+ protocols := make([]string, 0, len(globalRegistry.collectors))
+ for protocol := range globalRegistry.collectors {
+ protocols = append(protocols, protocol)
+ }
+ return protocols
+}
- mu.RLock()
- defer mu.RUnlock()
+// CollectorCount returns the number of registered collectors
+func CollectorCount() int {
+ globalRegistry.mu.RLock()
+ defer globalRegistry.mu.RUnlock()
- return collectStrategy[protocol]
+ return len(globalRegistry.collectors)
}
diff --git a/internal/collector/common/dispatcher/common_dispatcher.go
b/internal/collector/common/dispatcher/common_dispatcher.go
new file mode 100644
index 0000000..d7a58c0
--- /dev/null
+++ b/internal/collector/common/dispatcher/common_dispatcher.go
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package dispatcher
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// CommonDispatcherImpl is responsible for breaking down jobs into individual
metrics collection tasks
+// It manages collection timeouts, handles results, and decides on next round
scheduling
+type CommonDispatcherImpl struct {
+ logger logger.Logger
+ metricsCollector MetricsCollector
+ resultHandler ResultHandler
+ ctx context.Context
+ cancel context.CancelFunc
+ mu sync.RWMutex
+}
+
+// MetricsCollector interface for metrics collection
+type MetricsCollector interface {
+ CollectMetrics(metrics *jobtypes.Metrics, job *jobtypes.Job, timeout
*jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData
+}
+
+// ResultHandler interface for handling collection results
+type ResultHandler interface {
+ HandleCollectData(data *jobtypes.CollectRepMetricsData, job
*jobtypes.Job) error
+}
+
+// NewCommonDispatcher creates a new common dispatcher
+func NewCommonDispatcher(logger logger.Logger, metricsCollector
MetricsCollector, resultHandler ResultHandler) *CommonDispatcherImpl {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ return &CommonDispatcherImpl{
+ logger: logger.WithName("common-dispatcher"),
+ metricsCollector: metricsCollector,
+ resultHandler: resultHandler,
+ ctx: ctx,
+ cancel: cancel,
+ }
+}
+
+// DispatchMetricsTask dispatches a job by breaking it down into individual
metrics collection tasks
+func (cd *CommonDispatcherImpl) DispatchMetricsTask(job *jobtypes.Job, timeout
*jobtypes.Timeout) error {
+ if job == nil {
+ return fmt.Errorf("job cannot be nil")
+ }
+
+ cd.logger.Info("dispatching metrics task",
+ "jobID", job.ID,
+ "monitorID", job.MonitorID,
+ "app", job.App,
+ "metricsCount", len(job.Metrics))
+
+ startTime := time.Now()
+
+ // Get the next metrics to collect based on job priority and
dependencies
+ metricsToCollect := job.NextCollectMetrics()
+ if len(metricsToCollect) == 0 {
+ cd.logger.Info("no metrics to collect", "jobID", job.ID)
+ return nil
+ }
+
+ // Create collection context with timeout
+ collectCtx, collectCancel := context.WithTimeout(cd.ctx,
cd.getCollectionTimeout(job))
+ defer collectCancel()
+
+ // Collect all metrics concurrently using channels (Go's way of
handling concurrency)
+ resultChannels := make([]chan *jobtypes.CollectRepMetricsData,
len(metricsToCollect))
+
+ for i, metrics := range metricsToCollect {
+ cd.logger.Info("starting metrics collection",
+ "jobID", job.ID,
+ "metricsName", metrics.Name,
+ "protocol", metrics.Protocol)
+
+ // Start metrics collection in goroutine
+ resultChannels[i] = cd.metricsCollector.CollectMetrics(metrics,
job, timeout)
+ }
+
+ // Collect results from all metrics collection tasks
+ var results []*jobtypes.CollectRepMetricsData
+ var errors []error
+
+ for i, resultChan := range resultChannels {
+ select {
+ case result := <-resultChan:
+ if result != nil {
+ results = append(results, result)
+ cd.logger.Info("received metrics result",
+ "jobID", job.ID,
+ "metricsName", metricsToCollect[i].Name,
+ "code", result.Code)
+ }
+ case <-collectCtx.Done():
+ errors = append(errors, fmt.Errorf("timeout collecting
metrics: %s", metricsToCollect[i].Name))
+ cd.logger.Info("metrics collection timeout",
+ "jobID", job.ID,
+ "metricsName", metricsToCollect[i].Name)
+ }
+ }
+
+ duration := time.Since(startTime)
+
+ // Handle collection results
+ if err := cd.handleResults(results, job, errors); err != nil {
+ cd.logger.Error(err, "failed to handle collection results",
+ "jobID", job.ID,
+ "duration", duration)
+ return err
+ }
+
+ cd.logger.Info("successfully dispatched metrics task",
+ "jobID", job.ID,
+ "resultsCount", len(results),
+ "errorsCount", len(errors),
+ "duration", duration)
+
+ return nil
+}
+
+// handleResults processes the collection results and decides on next actions
+func (cd *CommonDispatcherImpl) handleResults(results
[]*jobtypes.CollectRepMetricsData, job *jobtypes.Job, errors []error) error {
+ cd.logger.Info("handling collection results",
+ "jobID", job.ID,
+ "resultsCount", len(results),
+ "errorsCount", len(errors))
+
+ // Process successful results
+ for _, result := range results {
+ if err := cd.resultHandler.HandleCollectData(result, job); err
!= nil {
+ cd.logger.Error(err, "failed to handle collect data",
+ "jobID", job.ID,
+ "metricsName", result.Metrics)
+ // Continue processing other results even if one fails
+ }
+ }
+
+ // Log errors but don't fail the entire job
+ for _, err := range errors {
+ cd.logger.Error(err, "metrics collection error", "jobID",
job.ID)
+ }
+
+ // TODO: Implement logic to decide if we need to trigger next level
metrics
+ // or next round scheduling based on results
+ cd.evaluateNextActions(job, results, errors)
+
+ return nil
+}
+
+// evaluateNextActions decides what to do next based on collection results
+func (cd *CommonDispatcherImpl) evaluateNextActions(job *jobtypes.Job, results
[]*jobtypes.CollectRepMetricsData, errors []error) {
+ cd.logger.Info("evaluating next actions",
+ "jobID", job.ID,
+ "resultsCount", len(results),
+ "errorsCount", len(errors))
+
+ // Check if we have dependent metrics that need to be collected next
+ hasNextLevel := cd.hasNextLevelMetrics(job, results)
+
+ if hasNextLevel {
+ cd.logger.Info("job has next level metrics to collect",
"jobID", job.ID)
+ // TODO: Schedule next level metrics collection
+ }
+
+ // For cyclic jobs, the rescheduling is handled by WheelTimerTask
+ if job.IsCyclic {
+ cd.logger.Info("cyclic job will be rescheduled by timer",
"jobID", job.ID)
+ }
+
+ // TODO: Send results to data queue for further processing
+ cd.sendToDataQueue(results, job)
+}
+
+// hasNextLevelMetrics checks if there are dependent metrics that should be
collected next
+func (cd *CommonDispatcherImpl) hasNextLevelMetrics(job *jobtypes.Job, results
[]*jobtypes.CollectRepMetricsData) bool {
+ // This is a simplified check - in a full implementation, this would
analyze
+ // metric dependencies and priorities to determine if more metrics need
collection
+ return false
+}
+
+// sendToDataQueue sends collection results to the data processing queue
+func (cd *CommonDispatcherImpl) sendToDataQueue(results
[]*jobtypes.CollectRepMetricsData, job *jobtypes.Job) {
+ cd.logger.Info("sending results to data queue",
+ "jobID", job.ID,
+ "resultsCount", len(results))
+
+ // TODO: Implement queue sending logic
+ // For now, we'll just log that data should be sent
+ for _, result := range results {
+ cd.logger.Info("result ready for queue",
+ "jobID", job.ID,
+ "metricsName", result.Metrics,
+ "code", result.Code,
+ "valuesCount", len(result.Values))
+ }
+}
+
+// getCollectionTimeout calculates the timeout for metrics collection
+func (cd *CommonDispatcherImpl) getCollectionTimeout(job *jobtypes.Job)
time.Duration {
+ // Default timeout is 30 seconds
+ defaultTimeout := 30 * time.Second
+
+ // If job has a specific timeout, use it
+ if job.DefaultInterval > 0 {
+ // Use 80% of the interval as timeout to allow for rescheduling
+ jobTimeout := time.Duration(job.DefaultInterval) * time.Second
* 80 / 100
+ if jobTimeout > defaultTimeout {
+ return jobTimeout
+ }
+ }
+
+ return defaultTimeout
+}
+
+// Stop stops the common dispatcher
+func (cd *CommonDispatcherImpl) Stop() error {
+ cd.logger.Info("stopping common dispatcher")
+ cd.cancel()
+ return nil
+}
diff --git a/internal/collector/common/dispatcher/hashed_wheel_timer.go
b/internal/collector/common/dispatcher/hashed_wheel_timer.go
new file mode 100644
index 0000000..026fd2b
--- /dev/null
+++ b/internal/collector/common/dispatcher/hashed_wheel_timer.go
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package dispatcher
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// hashedWheelTimer implements a time wheel for efficient timeout management
+type hashedWheelTimer struct {
+ logger logger.Logger
+ tickDuration time.Duration
+ wheelSize int
+ wheel []*bucket
+ currentTick int64
+ startTime time.Time
+ ticker *time.Ticker
+ ctx context.Context
+ cancel context.CancelFunc
+ started int32
+ stopped int32
+ wg sync.WaitGroup
+}
+
+// bucket represents a time wheel bucket containing timeouts
+type bucket struct {
+ timeouts []*jobtypes.Timeout
+ mu sync.RWMutex
+}
+
+// NewHashedWheelTimer creates a new hashed wheel timer
+func NewHashedWheelTimer(wheelSize int, tickDuration time.Duration, logger
logger.Logger) HashedWheelTimer {
+ if wheelSize <= 0 {
+ wheelSize = 512
+ }
+ if tickDuration <= 0 {
+ tickDuration = 100 * time.Millisecond
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ timer := &hashedWheelTimer{
+ logger: logger.WithName("hashed-wheel-timer"),
+ tickDuration: tickDuration,
+ wheelSize: wheelSize,
+ wheel: make([]*bucket, wheelSize),
+ ctx: ctx,
+ cancel: cancel,
+ }
+
+ // Initialize buckets
+ for i := 0; i < wheelSize; i++ {
+ timer.wheel[i] = &bucket{
+ timeouts: make([]*jobtypes.Timeout, 0),
+ }
+ }
+
+ return timer
+}
+
+// NewTimeout creates a new timeout and adds it to the wheel
+func (hwt *hashedWheelTimer) NewTimeout(task jobtypes.TimerTask, delay
time.Duration) *jobtypes.Timeout {
+ if atomic.LoadInt32(&hwt.stopped) == 1 {
+ hwt.logger.Info("timer is stopped, cannot create new timeout")
+ return nil
+ }
+
+ timeout := jobtypes.NewTimeout(task, delay)
+
+ // Calculate which bucket and tick this timeout belongs to
+ totalTicks := int64(delay / hwt.tickDuration)
+ currentTick := atomic.LoadInt64(&hwt.currentTick)
+ targetTick := currentTick + totalTicks
+
+ bucketIndex := int(targetTick % int64(hwt.wheelSize))
+ timeout.SetWheelIndex(int(targetTick))
+ timeout.SetBucketIndex(bucketIndex)
+
+ // Add to appropriate bucket
+ bucket := hwt.wheel[bucketIndex]
+ bucket.mu.Lock()
+ bucket.timeouts = append(bucket.timeouts, timeout)
+ bucket.mu.Unlock()
+
+ hwt.logger.Info("created timeout",
+ "delay", delay,
+ "bucketIndex", bucketIndex,
+ "targetTick", targetTick)
+
+ return timeout
+}
+
+// Start starts the timer wheel
+func (hwt *hashedWheelTimer) Start() error {
+ if !atomic.CompareAndSwapInt32(&hwt.started, 0, 1) {
+ return nil // already started
+ }
+
+ hwt.logger.Info("starting hashed wheel timer",
+ "wheelSize", hwt.wheelSize,
+ "tickDuration", hwt.tickDuration)
+
+ hwt.startTime = time.Now()
+ hwt.ticker = time.NewTicker(hwt.tickDuration)
+
+ hwt.wg.Add(1)
+ go hwt.run()
+
+ hwt.logger.Info("hashed wheel timer started")
+ return nil
+}
+
+// Stop stops the timer wheel
+func (hwt *hashedWheelTimer) Stop() error {
+ if !atomic.CompareAndSwapInt32(&hwt.stopped, 0, 1) {
+ return nil // already stopped
+ }
+
+ hwt.logger.Info("stopping hashed wheel timer")
+
+ hwt.cancel()
+
+ if hwt.ticker != nil {
+ hwt.ticker.Stop()
+ }
+
+ hwt.wg.Wait()
+
+ hwt.logger.Info("hashed wheel timer stopped")
+ return nil
+}
+
+// run is the main timer loop
+func (hwt *hashedWheelTimer) run() {
+ defer hwt.wg.Done()
+
+ for {
+ select {
+ case <-hwt.ctx.Done():
+ return
+ case <-hwt.ticker.C:
+ hwt.tick()
+ }
+ }
+}
+
+// tick processes one tick of the wheel
+func (hwt *hashedWheelTimer) tick() {
+ currentTick := atomic.AddInt64(&hwt.currentTick, 1)
+ bucketIndex := int(currentTick % int64(hwt.wheelSize))
+
+ bucket := hwt.wheel[bucketIndex]
+ bucket.mu.Lock()
+
+ // Process expired timeouts
+ var remaining []*jobtypes.Timeout
+ for _, timeout := range bucket.timeouts {
+ if timeout.IsCancelled() {
+ continue // skip cancelled timeouts
+ }
+
+ // Check if timeout has expired
+ if time.Now().After(timeout.Deadline()) {
+ // Execute the timeout task asynchronously
+ go func(t *jobtypes.Timeout) {
+ defer func() {
+ if r := recover(); r != nil {
+ hwt.logger.Error(nil, "panic in
timeout task execution", "panic", r)
+ }
+ }()
+
+ if err := t.Task().Run(t); err != nil {
+ hwt.logger.Error(err, "error executing
timeout task")
+ }
+ }(timeout)
+ } else {
+ // Not yet expired, keep in bucket
+ remaining = append(remaining, timeout)
+ }
+ }
+
+ bucket.timeouts = remaining
+ bucket.mu.Unlock()
+}
+
+// GetStats returns timer statistics
+func (hwt *hashedWheelTimer) GetStats() map[string]interface{} {
+ totalTimeouts := 0
+ for _, bucket := range hwt.wheel {
+ bucket.mu.RLock()
+ totalTimeouts += len(bucket.timeouts)
+ bucket.mu.RUnlock()
+ }
+
+ return map[string]interface{}{
+ "wheelSize": hwt.wheelSize,
+ "tickDuration": hwt.tickDuration,
+ "currentTick": atomic.LoadInt64(&hwt.currentTick),
+ "totalTimeouts": totalTimeouts,
+ "started": atomic.LoadInt32(&hwt.started) == 1,
+ "stopped": atomic.LoadInt32(&hwt.stopped) == 1,
+ }
+}
diff --git a/internal/collector/common/dispatcher/time_wheel.go
b/internal/collector/common/dispatcher/time_wheel.go
new file mode 100644
index 0000000..2040ec0
--- /dev/null
+++ b/internal/collector/common/dispatcher/time_wheel.go
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package dispatcher
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// TimeDispatch manages time-based job scheduling using a hashed wheel timer
+type TimeDispatch struct {
+ logger logger.Logger
+ wheelTimer HashedWheelTimer
+ commonDispatcher MetricsTaskDispatcher
+ cyclicTasks sync.Map // map[int64]*jobtypes.Timeout for cyclic jobs
+ tempTasks sync.Map // map[int64]*jobtypes.Timeout for one-time
jobs
+ ctx context.Context
+ cancel context.CancelFunc
+ started bool
+ mu sync.RWMutex
+}
+
+// MetricsTaskDispatcher interface for metrics task dispatching
+type MetricsTaskDispatcher interface {
+ DispatchMetricsTask(job *jobtypes.Job, timeout *jobtypes.Timeout) error
+}
+
+// HashedWheelTimer interface for time wheel operations
+type HashedWheelTimer interface {
+ NewTimeout(task jobtypes.TimerTask, delay time.Duration)
*jobtypes.Timeout
+ Start() error
+ Stop() error
+}
+
+// NewTimeDispatch creates a new time dispatcher
+func NewTimeDispatch(logger logger.Logger, commonDispatcher
MetricsTaskDispatcher) *TimeDispatch {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ td := &TimeDispatch{
+ logger: logger.WithName("time-dispatch"),
+ commonDispatcher: commonDispatcher,
+ ctx: ctx,
+ cancel: cancel,
+ started: false,
+ }
+
+ // Create hashed wheel timer with reasonable defaults
+ // 512 buckets, 100ms tick duration
+ td.wheelTimer = NewHashedWheelTimer(512, 100*time.Millisecond, logger)
+
+ return td
+}
+
+// AddJob adds a job to the time-based scheduler
+// This corresponds to Java's TimerDispatcher.addJob method
+func (td *TimeDispatch) AddJob(job *jobtypes.Job) error {
+ if job == nil {
+ return fmt.Errorf("job cannot be nil")
+ }
+
+ td.logger.Info("adding job to time dispatcher",
+ "jobID", job.ID,
+ "isCyclic", job.IsCyclic,
+ "interval", job.DefaultInterval)
+
+ // Create wheel timer task
+ timerTask := NewWheelTimerTask(job, td.commonDispatcher, td.logger)
+
+ // Calculate delay
+ var delay time.Duration
+ if job.DefaultInterval > 0 {
+ delay = time.Duration(job.DefaultInterval) * time.Second
+ } else {
+ delay = 30 * time.Second // default interval
+ }
+
+ // Create timeout
+ timeout := td.wheelTimer.NewTimeout(timerTask, delay)
+
+ // Store timeout based on job type
+ if job.IsCyclic {
+ td.cyclicTasks.Store(job.ID, timeout)
+ td.logger.Info("added cyclic job to scheduler", "jobID",
job.ID, "delay", delay)
+ } else {
+ td.tempTasks.Store(job.ID, timeout)
+ td.logger.Info("added one-time job to scheduler", "jobID",
job.ID, "delay", delay)
+ }
+
+ return nil
+}
+
+// RemoveJob removes a job from the scheduler
+func (td *TimeDispatch) RemoveJob(jobID int64) error {
+ td.logger.Info("removing job from time dispatcher", "jobID", jobID)
+
+ // Try to remove from cyclic tasks
+ if timeoutInterface, exists := td.cyclicTasks.LoadAndDelete(jobID);
exists {
+ if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok {
+ timeout.Cancel()
+ td.logger.Info("removed cyclic job", "jobID", jobID)
+ return nil
+ }
+ }
+
+ // Try to remove from temp tasks
+ if timeoutInterface, exists := td.tempTasks.LoadAndDelete(jobID);
exists {
+ if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok {
+ timeout.Cancel()
+ td.logger.Info("removed one-time job", "jobID", jobID)
+ return nil
+ }
+ }
+
+ return fmt.Errorf("job not found: %d", jobID)
+}
+
+// Start starts the time dispatcher
+func (td *TimeDispatch) Start(ctx context.Context) error {
+ td.mu.Lock()
+ defer td.mu.Unlock()
+
+ if td.started {
+ return fmt.Errorf("time dispatcher already started")
+ }
+
+ td.logger.Info("starting time dispatcher")
+
+ // Start the wheel timer
+ if err := td.wheelTimer.Start(); err != nil {
+ return fmt.Errorf("failed to start wheel timer: %w", err)
+ }
+
+ td.started = true
+ td.logger.Info("time dispatcher started successfully")
+ return nil
+}
+
+// Stop stops the time dispatcher
+func (td *TimeDispatch) Stop() error {
+ td.mu.Lock()
+ defer td.mu.Unlock()
+
+ if !td.started {
+ return nil
+ }
+
+ td.logger.Info("stopping time dispatcher")
+
+ // Cancel context
+ td.cancel()
+
+ // Cancel all running tasks
+ td.cyclicTasks.Range(func(key, value interface{}) bool {
+ if timeout, ok := value.(*jobtypes.Timeout); ok {
+ timeout.Cancel()
+ }
+ return true
+ })
+
+ td.tempTasks.Range(func(key, value interface{}) bool {
+ if timeout, ok := value.(*jobtypes.Timeout); ok {
+ timeout.Cancel()
+ }
+ return true
+ })
+
+ // Clear maps
+ td.cyclicTasks = sync.Map{}
+ td.tempTasks = sync.Map{}
+
+ // Stop wheel timer
+ if err := td.wheelTimer.Stop(); err != nil {
+ td.logger.Error(err, "error stopping wheel timer")
+ }
+
+ td.started = false
+ td.logger.Info("time dispatcher stopped")
+ return nil
+}
+
+// Stats returns dispatcher statistics
+func (td *TimeDispatch) Stats() map[string]interface{} {
+ cyclicCount := 0
+ tempCount := 0
+
+ td.cyclicTasks.Range(func(key, value interface{}) bool {
+ cyclicCount++
+ return true
+ })
+
+ td.tempTasks.Range(func(key, value interface{}) bool {
+ tempCount++
+ return true
+ })
+
+ return map[string]interface{}{
+ "cyclicJobs": cyclicCount,
+ "tempJobs": tempCount,
+ "started": td.started,
+ }
+}
diff --git a/internal/collector/common/dispatcher/wheel_timer_task.go
b/internal/collector/common/dispatcher/wheel_timer_task.go
new file mode 100644
index 0000000..ebda8f2
--- /dev/null
+++ b/internal/collector/common/dispatcher/wheel_timer_task.go
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package dispatcher
+
+import (
+ "fmt"
+ "time"
+
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// WheelTimerTask represents a task that runs in the time wheel
+// This corresponds to Java's WheelTimerTask
+type WheelTimerTask struct {
+ job *jobtypes.Job
+ commonDispatcher MetricsTaskDispatcher
+ logger logger.Logger
+}
+
+// NewWheelTimerTask creates a new wheel timer task
+func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher
MetricsTaskDispatcher, logger logger.Logger) *WheelTimerTask {
+ return &WheelTimerTask{
+ job: job,
+ commonDispatcher: commonDispatcher,
+ logger: logger.WithName("wheel-timer-task"),
+ }
+}
+
+// Run executes the timer task
+// This corresponds to Java's WheelTimerTask.run method
+func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) error {
+ if wtt.job == nil {
+ wtt.logger.Error(nil, "job is nil, cannot execute")
+ return fmt.Errorf("job is nil, cannot execute")
+ }
+
+ wtt.logger.Info("executing wheel timer task",
+ "jobID", wtt.job.ID,
+ "monitorID", wtt.job.MonitorID,
+ "app", wtt.job.App)
+
+ startTime := time.Now()
+
+ // Dispatch metrics task through common dispatcher
+ // This is where the job gets broken down into individual metric
collection tasks
+ err := wtt.commonDispatcher.DispatchMetricsTask(wtt.job, timeout)
+
+ duration := time.Since(startTime)
+
+ if err != nil {
+ wtt.logger.Error(err, "failed to dispatch metrics task",
+ "jobID", wtt.job.ID,
+ "duration", duration)
+ return err
+ }
+
+ wtt.logger.Info("successfully dispatched metrics task",
+ "jobID", wtt.job.ID,
+ "duration", duration)
+
+ // If this is a cyclic job, reschedule it for the next execution
+ if wtt.job.IsCyclic && wtt.job.DefaultInterval > 0 {
+ wtt.rescheduleJob(timeout)
+ }
+
+ return nil
+}
+
+// rescheduleJob reschedules a cyclic job for the next execution
+func (wtt *WheelTimerTask) rescheduleJob(timeout *jobtypes.Timeout) {
+ wtt.logger.Info("rescheduling cyclic job",
+ "jobID", wtt.job.ID,
+ "interval", wtt.job.DefaultInterval)
+
+ // TODO: Implement rescheduling logic
+ // This would involve creating a new timeout with the same job
+ // and adding it back to the time wheel
+ // For now, we'll log that rescheduling is needed
+ wtt.logger.Info("cyclic job needs rescheduling",
+ "jobID", wtt.job.ID,
+ "nextExecutionIn",
time.Duration(wtt.job.DefaultInterval)*time.Second)
+}
+
+// GetJob returns the associated job
+func (wtt *WheelTimerTask) GetJob() *jobtypes.Job {
+ return wtt.job
+}
diff --git a/internal/collector/common/job/job_server.go
b/internal/collector/common/job/job_server.go
index fdd7830..31eed6f 100644
--- a/internal/collector/common/job/job_server.go
+++ b/internal/collector/common/job/job_server.go
@@ -17,51 +17,191 @@
* under the License.
*/
-package transport
+package job
import (
"context"
+ "fmt"
+ "sync"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/dispatch"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/dispatcher"
clrServer
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
)
+// TimeDispatcher interface defines time-based job scheduling
+type TimeDispatcher interface {
+ AddJob(job *jobtypes.Job) error
+ RemoveJob(jobID int64) error
+ Start(ctx context.Context) error
+ Stop() error
+}
+
+// Config represents job service configuration
type Config struct {
clrServer.Server
}
+// Runner implements the service runner interface
type Runner struct {
Config
+ timeDispatch TimeDispatcher
+ mu sync.RWMutex
+ runningJobs map[int64]*jobtypes.Job
+ ctx context.Context
+ cancel context.CancelFunc
+}
+
+// AddAsyncCollectJob adds a job to async collection scheduling
+func (r *Runner) AddAsyncCollectJob(job *jobtypes.Job) error {
+ if job == nil {
+ r.Logger.Error(nil, "job cannot be nil")
+ return fmt.Errorf("job cannot be nil")
+ }
+
+ r.Logger.Info("adding async collect job",
+ "jobID", job.ID,
+ "monitorID", job.MonitorID,
+ "app", job.App,
+ "isCyclic", job.IsCyclic)
+
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ // Store job in running jobs map
+ r.runningJobs[job.ID] = job
+
+ // Add job to time dispatcher for scheduling
+ if err := r.timeDispatch.AddJob(job); err != nil {
+ delete(r.runningJobs, job.ID)
+ r.Logger.Error(err, "failed to add job to time dispatcher",
"jobID", job.ID)
+ return fmt.Errorf("failed to add job to time dispatcher: %w",
err)
+ }
+
+ r.Logger.Info("successfully added job to scheduler", "jobID", job.ID)
+ return nil
+}
+
+// RemoveAsyncCollectJob removes a job from scheduling
+func (r *Runner) RemoveAsyncCollectJob(jobID int64) error {
+ r.Logger.Info("removing async collect job", "jobID", jobID)
+
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ // Remove from running jobs
+ delete(r.runningJobs, jobID)
+
+ // Remove from time dispatcher
+ if err := r.timeDispatch.RemoveJob(jobID); err != nil {
+ r.Logger.Error(err, "failed to remove job from time
dispatcher", "jobID", jobID)
+ return fmt.Errorf("failed to remove job from time dispatcher:
%w", err)
+ }
+
+ r.Logger.Info("successfully removed job from scheduler", "jobID", jobID)
+ return nil
+}
+
+// RunningJobs returns a copy of currently running jobs
+func (r *Runner) RunningJobs() map[int64]*jobtypes.Job {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+
+ result := make(map[int64]*jobtypes.Job)
+ for id, job := range r.runningJobs {
+ result[id] = job
+ }
+ return result
}
+// New creates a new job service runner with all components initialized
func New(srv *Config) *Runner {
+ ctx, cancel := context.WithCancel(context.Background())
- return &Runner{
- Config: *srv,
+ // Create result handler
+ resultHandler := collect.NewResultHandler(srv.Logger)
+
+ // Create metrics collector
+ metricsCollector := dispatch.NewMetricsCollector(srv.Logger)
+
+ // Create common dispatcher
+ commonDispatcher := dispatcher.NewCommonDispatcher(srv.Logger,
metricsCollector, resultHandler)
+
+ // Create time dispatcher
+ timeDispatch := dispatcher.NewTimeDispatch(srv.Logger, commonDispatcher)
+
+ runner := &Runner{
+ Config: *srv,
+ timeDispatch: timeDispatch,
+ runningJobs: make(map[int64]*jobtypes.Job),
+ ctx: ctx,
+ cancel: cancel,
}
+
+ return runner
}
+// Start starts the job service runner
func (r *Runner) Start(ctx context.Context) error {
-
r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner",
r.Info().Name)
+ r.Logger.Info("Starting job service runner")
+
+ // Start the time dispatcher
+ if r.timeDispatch != nil {
+ if err := r.timeDispatch.Start(ctx); err != nil {
+ r.Logger.Error(err, "failed to start time dispatcher")
+ return fmt.Errorf("failed to start time dispatcher:
%w", err)
+ }
+ } else {
+ return fmt.Errorf("time dispatcher is not initialized")
+ }
- r.Logger.Info("Starting job server")
+ r.Logger.Info("job service runner started successfully")
select {
case <-ctx.Done():
+ r.Logger.Info("job service runner stopped by context")
+ if r.timeDispatch != nil {
+ if err := r.timeDispatch.Stop(); err != nil {
+ r.Logger.Error(err, "error stopping time
dispatcher")
+ }
+ }
return nil
}
}
+// Info returns runner information
func (r *Runner) Info() collector.Info {
-
return collector.Info{
- Name: "job",
+ Name: "job-service",
}
}
+// Close closes the job service runner and all its components
func (r *Runner) Close() error {
+ r.Logger.Info("closing job service runner")
+
+ r.cancel()
+
+ // Stop time dispatcher
+ if r.timeDispatch != nil {
+ if err := r.timeDispatch.Stop(); err != nil {
+ r.Logger.Error(err, "error stopping time dispatcher")
+ // Continue cleanup despite error
+ }
+ }
+
+ // Stop common dispatcher (via time dispatcher's commonDispatcher field)
+ // This is handled internally by the time dispatcher's Stop method
+
+ // Clear running jobs
+ r.mu.Lock()
+ r.runningJobs = make(map[int64]*jobtypes.Job)
+ r.mu.Unlock()
- r.Logger.Info("job close...")
+ r.Logger.Info("job service runner closed")
return nil
}
diff --git a/internal/collector/common/types/job/job_types.go
b/internal/collector/common/types/job/job_types.go
index 9f1df89..28478b8 100644
--- a/internal/collector/common/types/job/job_types.go
+++ b/internal/collector/common/types/job/job_types.go
@@ -50,10 +50,10 @@ type Job struct {
ResponseDataTemp []MetricsData `json:"-"`
}
-// GetNextCollectMetrics returns the metrics that should be collected next
+// NextCollectMetrics returns the metrics that should be collected next
// This is a simplified version - in the full implementation this would handle
// metric priorities, dependencies, and collection levels
-func (j *Job) GetNextCollectMetrics() []*Metrics {
+func (j *Job) NextCollectMetrics() []*Metrics {
result := make([]*Metrics, 0, len(j.Metrics))
for i := range j.Metrics {
result = append(result, &j.Metrics[i])
diff --git a/internal/collector/common/types/job/timeout_types.go
b/internal/collector/common/types/job/timeout_types.go
index 82cd520..ab93c5a 100644
--- a/internal/collector/common/types/job/timeout_types.go
+++ b/internal/collector/common/types/job/timeout_types.go
@@ -79,22 +79,22 @@ func (t *Timeout) Context() context.Context {
return t.ctx
}
-// SetWheelIndex sets the wheel index for internal use
+// SetWheelIndex sets the wheel index for this timeout
func (t *Timeout) SetWheelIndex(index int) {
t.wheelIndex = index
}
-// GetWheelIndex gets the wheel index
-func (t *Timeout) GetWheelIndex() int {
- return t.wheelIndex
-}
-
-// SetBucketIndex sets the bucket index for internal use
+// SetBucketIndex sets the bucket index for this timeout
func (t *Timeout) SetBucketIndex(index int) {
t.bucketIndex = index
}
-// GetBucketIndex gets the bucket index
-func (t *Timeout) GetBucketIndex() int {
+// WheelIndex returns the wheel index
+func (t *Timeout) WheelIndex() int {
+ return t.wheelIndex
+}
+
+// BucketIndex returns the bucket index
+func (t *Timeout) BucketIndex() int {
return t.bucketIndex
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]