This is an automated email from the ASF dual-hosted git repository. gaoxingcun pushed a commit to branch reconfiguration_scheduling in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit 165ed67c04c2370933c10e199ab981b550defb10 Author: TJxiaobao <[email protected]> AuthorDate: Mon Sep 8 09:13:40 2025 +0800 add:The first version of the modified schedule --- .gitignore | 1 - etc/hertzbeat-collector.yaml | 25 +++ internal/cmd/server.go | 6 + internal/collector/basic/abstract_collect.go | 119 ---------- .../collector/basic/database/jdbc_collector.go | 66 ++++-- .../{common/job/job_server.go => basic/init.go} | 66 ++---- internal/collector/basic/registry/.keep | 0 .../common/collect/dispatch/metrics_collector.go | 141 ++++++++++++ .../collector/common/collect/result_handler.go | 77 +++++++ .../common/collect/strategy/strategy_factory.go | 110 +++++++-- .../common/dispatcher/common_dispatcher.go | 245 +++++++++++++++++++++ .../common/dispatcher/hashed_wheel_timer.go | 226 +++++++++++++++++++ internal/collector/common/dispatcher/time_wheel.go | 223 +++++++++++++++++++ .../common/dispatcher/wheel_timer_task.go | 105 +++++++++ internal/collector/common/job/job_server.go | 139 +++++++++++- .../collector/common/types/job/timeout_types.go | 18 +- 16 files changed, 1348 insertions(+), 219 deletions(-) diff --git a/.gitignore b/.gitignore index f11a8e8..2600dc2 100644 --- a/.gitignore +++ b/.gitignore @@ -40,7 +40,6 @@ security/cmd/node_agent/na/pkey vendor # Contains the built artifacts out/ -etc/ var/ # Go compiled tests *.test diff --git a/etc/hertzbeat-collector.yaml b/etc/hertzbeat-collector.yaml new file mode 100644 index 0000000..3cae1f6 --- /dev/null +++ b/etc/hertzbeat-collector.yaml @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +collector: + info: + name: hertzbeat-collector-go + ip: 127.0.0.1 + port: 8080 + + log: + level: debug diff --git a/internal/cmd/server.go b/internal/cmd/server.go index 3b88e5b..81a8bfe 100644 --- a/internal/cmd/server.go +++ b/internal/cmd/server.go @@ -30,6 +30,7 @@ import ( "github.com/spf13/cobra" bannerouter "hertzbeat.apache.org/hertzbeat-collector-go/internal/banner" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic" jobserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job" clrServer "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server" transportserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport" @@ -109,6 +110,11 @@ func server(ctx context.Context, logOut io.Writer) error { func startRunners(ctx context.Context, cfg *clrServer.Server) error { + // Initialize all collectors before starting runners + // This ensures all protocol collectors are registered and ready + cfg.Logger.Info("Initializing collectors...") + basic.InitializeAllCollectors(cfg.Logger) + runners := []struct { runner Runner[collectortypes.Info] }{ diff --git a/internal/collector/basic/abstract_collect.go b/internal/collector/basic/abstract_collect.go deleted file mode 100644 index c67e618..0000000 --- a/internal/collector/basic/abstract_collect.go +++ /dev/null @@ -1,119 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package basic - -import ( - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/timer" -) - -// AbstractCollector defines the interface for all collectors -type AbstractCollector interface { - // PreCheck validates metrics configuration - PreCheck(metrics *jobtypes.Metrics) error - - // Collect performs the actual metrics collection - Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRepMetricsData - - // SupportProtocol returns the protocol this collector supports - SupportProtocol() string -} - -// BaseCollector provides common functionality for all collectors -type BaseCollector struct { - Logger logger.Logger -} - -// NewBaseCollector creates a new base collector -func NewBaseCollector(logger logger.Logger) *BaseCollector { - return &BaseCollector{ - Logger: logger.WithName("base-collector"), - } -} - -// CreateSuccessResponse creates a successful metrics data response -func (bc *BaseCollector) CreateSuccessResponse(metrics *jobtypes.Metrics) *jobtypes.CollectRepMetricsData { - return &jobtypes.CollectRepMetricsData{ - ID: 0, // Will be set by the calling context - MonitorID: 0, // Will be set by the calling context - App: "", // Will be set by the calling context - Metrics: metrics.Name, - Priority: 0, - Time: timer.GetCurrentTimeMillis(), - Code: 200, // Success - Msg: "success", - Fields: make([]jobtypes.Field, 0), - Values: make([]jobtypes.ValueRow, 0), - Labels: make(map[string]string), - Metadata: make(map[string]string), - } -} - -// CreateFailResponse creates a failed metrics data response -func (bc *BaseCollector) CreateFailResponse(metrics *jobtypes.Metrics, code int, message string) *jobtypes.CollectRepMetricsData { - return &jobtypes.CollectRepMetricsData{ - ID: 0, // Will be set by the calling context - MonitorID: 0, // Will be set by the calling context - App: "", // Will be set by the calling context - Metrics: metrics.Name, - Priority: 0, - Time: timer.GetCurrentTimeMillis(), - Code: code, - Msg: message, - Fields: make([]jobtypes.Field, 0), - Values: make([]jobtypes.ValueRow, 0), - Labels: make(map[string]string), - Metadata: make(map[string]string), - } -} - -// CollectorRegistry manages registered collectors -type CollectorRegistry struct { - collectors map[string]AbstractCollector - logger logger.Logger -} - -// NewCollectorRegistry creates a new collector registry -func NewCollectorRegistry(logger logger.Logger) *CollectorRegistry { - return &CollectorRegistry{ - collectors: make(map[string]AbstractCollector), - logger: logger.WithName("collector-registry"), - } -} - -// Register registers a collector for a specific protocol -func (cr *CollectorRegistry) Register(protocol string, collector AbstractCollector) { - cr.collectors[protocol] = collector - cr.logger.Info("registered collector", "protocol", protocol) -} - -// GetCollector gets a collector for the specified protocol -func (cr *CollectorRegistry) GetCollector(protocol string) (AbstractCollector, bool) { - collector, exists := cr.collectors[protocol] - return collector, exists -} - -// GetSupportedProtocols returns all supported protocols -func (cr *CollectorRegistry) GetSupportedProtocols() []string { - protocols := make([]string, 0, len(cr.collectors)) - for protocol := range cr.collectors { - protocols = append(protocols, protocol) - } - return protocols -} diff --git a/internal/collector/basic/database/jdbc_collector.go b/internal/collector/basic/database/jdbc_collector.go index 361ccc9..01fa7f2 100644 --- a/internal/collector/basic/database/jdbc_collector.go +++ b/internal/collector/basic/database/jdbc_collector.go @@ -30,8 +30,6 @@ import ( _ "github.com/microsoft/go-mssqldb" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" - - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic" ) const ( @@ -59,13 +57,13 @@ const ( // JDBCCollector implements JDBC database collection type JDBCCollector struct { - *basic.BaseCollector + logger logger.Logger } // NewJDBCCollector creates a new JDBC collector func NewJDBCCollector(logger logger.Logger) *JDBCCollector { return &JDBCCollector{ - BaseCollector: basic.NewBaseCollector(logger.WithName("jdbc-collector")), + logger: logger.WithName("jdbc-collector"), } } @@ -127,7 +125,7 @@ func (jc *JDBCCollector) Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRep startTime := time.Now() jdbc := metrics.JDBC - jc.Logger.Info("starting JDBC collection", + jc.logger.Info("starting JDBC collection", "host", jdbc.Host, "port", jdbc.Port, "platform", jdbc.Platform, @@ -140,20 +138,20 @@ func (jc *JDBCCollector) Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRep // Get database URL databaseURL, err := jc.constructDatabaseURL(jdbc) if err != nil { - jc.Logger.Error(err, "failed to construct database URL") - return jc.CreateFailResponse(metrics, CodeFail, fmt.Sprintf("Database URL error: %v", err)) + jc.logger.Error(err, "failed to construct database URL") + return jc.createFailResponse(metrics, CodeFail, fmt.Sprintf("Database URL error: %v", err)) } // Create database connection db, err := jc.getConnection(databaseURL, jdbc.Username, jdbc.Password, timeout) if err != nil { - jc.Logger.Error(err, "failed to connect to database") - return jc.CreateFailResponse(metrics, CodeUnConnectable, fmt.Sprintf("Connection error: %v", err)) + jc.logger.Error(err, "failed to connect to database") + return jc.createFailResponse(metrics, CodeUnConnectable, fmt.Sprintf("Connection error: %v", err)) } defer db.Close() // Execute query based on type with context - response := jc.CreateSuccessResponse(metrics) + response := jc.createSuccessResponse(metrics) switch jdbc.QueryType { case QueryTypeOneRow: @@ -169,20 +167,20 @@ func (jc *JDBCCollector) Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRep } if err != nil { - jc.Logger.Error(err, "query execution failed", "queryType", jdbc.QueryType) - return jc.CreateFailResponse(metrics, CodeFail, fmt.Sprintf("Query error: %v", err)) + jc.logger.Error(err, "query execution failed", "queryType", jdbc.QueryType) + return jc.createFailResponse(metrics, CodeFail, fmt.Sprintf("Query error: %v", err)) } duration := time.Since(startTime) - jc.Logger.Info("JDBC collection completed", + jc.logger.Info("JDBC collection completed", "duration", duration, "rowCount", len(response.Values)) return response } -// SupportProtocol returns the protocol this collector supports -func (jc *JDBCCollector) SupportProtocol() string { +// Protocol returns the protocol this collector supports +func (jc *JDBCCollector) Protocol() string { return ProtocolJDBC } @@ -467,6 +465,42 @@ func (jc *JDBCCollector) queryColumns(db *sql.DB, sqlQuery string, aliasFields [ // runScript executes a SQL script (placeholder implementation) func (jc *JDBCCollector) runScript(db *sql.DB, scriptPath string, response *jobtypes.CollectRepMetricsData) error { // For security reasons, we'll just return success without actually running scripts - jc.Logger.Info("script execution requested but disabled for security", "scriptPath", scriptPath) + jc.logger.Info("script execution requested but disabled for security", "scriptPath", scriptPath) return nil } + +// createSuccessResponse creates a successful metrics data response +func (jc *JDBCCollector) createSuccessResponse(metrics *jobtypes.Metrics) *jobtypes.CollectRepMetricsData { + return &jobtypes.CollectRepMetricsData{ + ID: 0, // Will be set by the calling context + MonitorID: 0, // Will be set by the calling context + App: "", // Will be set by the calling context + Metrics: metrics.Name, + Priority: 0, + Time: time.Now().UnixMilli(), + Code: 200, // Success + Msg: "success", + Fields: make([]jobtypes.Field, 0), + Values: make([]jobtypes.ValueRow, 0), + Labels: make(map[string]string), + Metadata: make(map[string]string), + } +} + +// createFailResponse creates a failed metrics data response +func (jc *JDBCCollector) createFailResponse(metrics *jobtypes.Metrics, code int, message string) *jobtypes.CollectRepMetricsData { + return &jobtypes.CollectRepMetricsData{ + ID: 0, // Will be set by the calling context + MonitorID: 0, // Will be set by the calling context + App: "", // Will be set by the calling context + Metrics: metrics.Name, + Priority: 0, + Time: time.Now().UnixMilli(), + Code: code, + Msg: message, + Fields: make([]jobtypes.Field, 0), + Values: make([]jobtypes.ValueRow, 0), + Labels: make(map[string]string), + Metadata: make(map[string]string), + } +} diff --git a/internal/collector/common/job/job_server.go b/internal/collector/basic/init.go similarity index 50% copy from internal/collector/common/job/job_server.go copy to internal/collector/basic/init.go index fdd7830..e8456ed 100644 --- a/internal/collector/common/job/job_server.go +++ b/internal/collector/basic/init.go @@ -17,51 +17,31 @@ * under the License. */ -package transport +package basic import ( - "context" - - clrServer "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/database" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) -type Config struct { - clrServer.Server -} - -type Runner struct { - Config -} - -func New(srv *Config) *Runner { - - return &Runner{ - Config: *srv, - } -} - -func (r *Runner) Start(ctx context.Context) error { - - r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", r.Info().Name) - - r.Logger.Info("Starting job server") - - select { - case <-ctx.Done(): - return nil - } -} - -func (r *Runner) Info() collector.Info { - - return collector.Info{ - Name: "job", - } -} - -func (r *Runner) Close() error { - - r.Logger.Info("job close...") - return nil +// InitializeAllCollectors 初始化所有已注册的采集器 +// 当logger可用时调用此函数来创建实际的采集器实例 +func InitializeAllCollectors(logger logger.Logger) { + logger.Info("initializing all collectors") + + // 直接注册所有采集器实例 + // 新增协议时在这里添加 + jdbcCollector := database.NewJDBCCollector(logger) + strategy.Register(jdbcCollector) + + // 未来可以在这里添加更多协议: + // httpCollector := http.NewHTTPCollector(logger) + // strategy.Register(httpCollector) + + // 记录初始化结果 + protocols := strategy.SupportedProtocols() + logger.Info("collectors initialized successfully", + "protocols", protocols, + "count", len(protocols)) } diff --git a/internal/collector/basic/registry/.keep b/internal/collector/basic/registry/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go b/internal/collector/common/collect/dispatch/metrics_collector.go new file mode 100644 index 0000000..b569858 --- /dev/null +++ b/internal/collector/common/collect/dispatch/metrics_collector.go @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package dispatch + +import ( + "fmt" + "time" + + _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// MetricsCollector handles metrics collection using goroutines and channels +type MetricsCollector struct { + logger logger.Logger +} + +// NewMetricsCollector creates a new metrics collector +func NewMetricsCollector(logger logger.Logger) *MetricsCollector { + return &MetricsCollector{ + logger: logger.WithName("metrics-collector"), + } +} + +// CollectMetrics collects metrics using the appropriate collector and returns results via channel +// This method uses Go's channel-based concurrency instead of Java's thread pools +func (mc *MetricsCollector) CollectMetrics(metrics *jobtypes.Metrics, job *jobtypes.Job, timeout *jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData { + resultChan := make(chan *jobtypes.CollectRepMetricsData, 1) + + // Start collection in a goroutine (Go's lightweight thread) + go func() { + defer close(resultChan) + + mc.logger.Info("starting metrics collection", + "jobID", job.ID, + "metricsName", metrics.Name, + "protocol", metrics.Protocol) + + startTime := time.Now() + + // Get the appropriate collector based on protocol + collector, err := strategy.CollectorFor(metrics.Protocol) + if err != nil { + mc.logger.Error(err, "failed to get collector", + "protocol", metrics.Protocol, + "metricsName", metrics.Name) + + result := mc.createErrorResponse(metrics, job, 500, fmt.Sprintf("Collector not found: %v", err)) + resultChan <- result + return + } + + // Perform the actual collection + result := collector.Collect(metrics) + + // Enrich result with job information + if result != nil { + result.ID = job.ID + result.MonitorID = job.MonitorID + result.App = job.App + result.TenantID = job.TenantID + + // Add labels from job + if result.Labels == nil { + result.Labels = make(map[string]string) + } + for k, v := range job.Labels { + result.Labels[k] = v + } + + // Add metadata from job + if result.Metadata == nil { + result.Metadata = make(map[string]string) + } + for k, v := range job.Metadata { + result.Metadata[k] = v + } + } + + duration := time.Since(startTime) + + if result != nil && result.Code == 200 { + mc.logger.Info("metrics collection completed successfully", + "jobID", job.ID, + "metricsName", metrics.Name, + "protocol", metrics.Protocol, + "duration", duration, + "valuesCount", len(result.Values)) + } else { + mc.logger.Info("metrics collection failed", + "jobID", job.ID, + "metricsName", metrics.Name, + "protocol", metrics.Protocol, + "duration", duration, + "code", result.Code, + "message", result.Msg) + } + + resultChan <- result + }() + + return resultChan +} + +// createErrorResponse creates an error response for failed collections +func (mc *MetricsCollector) createErrorResponse(metrics *jobtypes.Metrics, job *jobtypes.Job, code int, message string) *jobtypes.CollectRepMetricsData { + return &jobtypes.CollectRepMetricsData{ + ID: job.ID, + MonitorID: job.MonitorID, + TenantID: job.TenantID, + App: job.App, + Metrics: metrics.Name, + Priority: metrics.Priority, + Time: time.Now().UnixMilli(), + Code: code, + Msg: message, + Fields: make([]jobtypes.Field, 0), + Values: make([]jobtypes.ValueRow, 0), + Labels: make(map[string]string), + Metadata: make(map[string]string), + } +} diff --git a/internal/collector/common/collect/result_handler.go b/internal/collector/common/collect/result_handler.go new file mode 100644 index 0000000..701ca63 --- /dev/null +++ b/internal/collector/common/collect/result_handler.go @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package collect + +import ( + "fmt" + + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// ResultHandlerImpl implements the ResultHandler interface +type ResultHandlerImpl struct { + logger logger.Logger + // TODO: Add data queue or storage interface when needed +} + +// NewResultHandler creates a new result handler +func NewResultHandler(logger logger.Logger) *ResultHandlerImpl { + return &ResultHandlerImpl{ + logger: logger.WithName("result-handler"), + } +} + +// HandleCollectData processes the collection results +func (rh *ResultHandlerImpl) HandleCollectData(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job) error { + if data == nil { + rh.logger.Error(nil, "collect data is nil") + return fmt.Errorf("collect data is nil") + } + + rh.logger.Info("handling collect data", + "jobID", job.ID, + "monitorID", job.MonitorID, + "metricsName", data.Metrics, + "code", data.Code, + "valuesCount", len(data.Values)) + + // TODO: Implement actual data processing logic + // This could include: + // 1. Data validation and transformation + // 2. Sending to message queue + // 3. Storing to database + // 4. Triggering alerts based on thresholds + // 5. Updating monitoring status + + if data.Code == 200 { + rh.logger.Info("successfully processed collect data", + "jobID", job.ID, + "metricsName", data.Metrics) + } else { + rh.logger.Info("received failed collect data", + "jobID", job.ID, + "metricsName", data.Metrics, + "code", data.Code, + "message", data.Msg) + } + + return nil +} diff --git a/internal/collector/common/collect/strategy/strategy_factory.go b/internal/collector/common/collect/strategy/strategy_factory.go index 17e4874..9fc9af1 100644 --- a/internal/collector/common/collect/strategy/strategy_factory.go +++ b/internal/collector/common/collect/strategy/strategy_factory.go @@ -18,39 +18,103 @@ package strategy import ( + "fmt" "sync" + + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) -// AbstractCollect interface -type AbstractCollect interface { - SupportProtocol() string +// Collector interface defines the contract for all collectors +type Collector interface { + // Collect performs the actual metrics collection + Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRepMetricsData + + // Protocol returns the protocol this collector supports + Protocol() string } -// strategy container -var ( - collectStrategy = make(map[string]AbstractCollect) - mu sync.RWMutex -) +// CollectorFactory is a function that creates a collector with a logger +type CollectorFactory func(logger.Logger) Collector -// Register a collect strategy -// Example: registration in init() of each implementation file -// -// func init() { -// Register(&XXXCollect{}) -// } -func Register(collect AbstractCollect) { +// CollectorRegistry manages all registered collector factories +type CollectorRegistry struct { + factories map[string]CollectorFactory + collectors map[string]Collector + mu sync.RWMutex + initialized bool +} + +// Global collector registry instance +var globalRegistry = &CollectorRegistry{ + factories: make(map[string]CollectorFactory), + collectors: make(map[string]Collector), +} + +// RegisterFactory registers a collector factory function +// This is called during init() to register collector factories +func RegisterFactory(protocol string, factory CollectorFactory) { + globalRegistry.mu.Lock() + defer globalRegistry.mu.Unlock() + + globalRegistry.factories[protocol] = factory +} - mu.Lock() - defer mu.Unlock() +// InitializeCollectors creates actual collector instances using the registered factories +// This should be called once when logger is available +func InitializeCollectors(logger logger.Logger) { + globalRegistry.mu.Lock() + defer globalRegistry.mu.Unlock() - collectStrategy[collect.SupportProtocol()] = collect + if globalRegistry.initialized { + return // Already initialized + } + + for protocol, factory := range globalRegistry.factories { + collector := factory(logger) + globalRegistry.collectors[protocol] = collector + } + + globalRegistry.initialized = true +} + +// Register a collector directly (legacy method, kept for compatibility) +func Register(collector Collector) { + globalRegistry.mu.Lock() + defer globalRegistry.mu.Unlock() + + protocol := collector.Protocol() + globalRegistry.collectors[protocol] = collector } -// Invoke returns the collect strategy for a protocol -func Invoke(protocol string) AbstractCollect { +// CollectorFor returns the collector for a protocol +func CollectorFor(protocol string) (Collector, error) { + globalRegistry.mu.RLock() + defer globalRegistry.mu.RUnlock() + + collector, exists := globalRegistry.collectors[protocol] + if !exists { + return nil, fmt.Errorf("no collector found for protocol: %s", protocol) + } + return collector, nil +} + +// SupportedProtocols returns all supported protocols +func SupportedProtocols() []string { + globalRegistry.mu.RLock() + defer globalRegistry.mu.RUnlock() + + protocols := make([]string, 0, len(globalRegistry.collectors)) + for protocol := range globalRegistry.collectors { + protocols = append(protocols, protocol) + } + return protocols +} - mu.RLock() - defer mu.RUnlock() +// CollectorCount returns the number of registered collectors +func CollectorCount() int { + globalRegistry.mu.RLock() + defer globalRegistry.mu.RUnlock() - return collectStrategy[protocol] + return len(globalRegistry.collectors) } diff --git a/internal/collector/common/dispatcher/common_dispatcher.go b/internal/collector/common/dispatcher/common_dispatcher.go new file mode 100644 index 0000000..95fb25e --- /dev/null +++ b/internal/collector/common/dispatcher/common_dispatcher.go @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package dispatcher + +import ( + "context" + "fmt" + "sync" + "time" + + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// CommonDispatcherImpl is responsible for breaking down jobs into individual metrics collection tasks +// It manages collection timeouts, handles results, and decides on next round scheduling +type CommonDispatcherImpl struct { + logger logger.Logger + metricsCollector MetricsCollector + resultHandler ResultHandler + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex +} + +// MetricsCollector interface for metrics collection +type MetricsCollector interface { + CollectMetrics(metrics *jobtypes.Metrics, job *jobtypes.Job, timeout *jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData +} + +// ResultHandler interface for handling collection results +type ResultHandler interface { + HandleCollectData(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job) error +} + +// NewCommonDispatcher creates a new common dispatcher +func NewCommonDispatcher(logger logger.Logger, metricsCollector MetricsCollector, resultHandler ResultHandler) *CommonDispatcherImpl { + ctx, cancel := context.WithCancel(context.Background()) + + return &CommonDispatcherImpl{ + logger: logger.WithName("common-dispatcher"), + metricsCollector: metricsCollector, + resultHandler: resultHandler, + ctx: ctx, + cancel: cancel, + } +} + +// DispatchMetricsTask dispatches a job by breaking it down into individual metrics collection tasks +// This is the core method that corresponds to Java's CommonDispatcher.dispatchMetricsTask +func (cd *CommonDispatcherImpl) DispatchMetricsTask(job *jobtypes.Job, timeout *jobtypes.Timeout) error { + if job == nil { + return fmt.Errorf("job cannot be nil") + } + + cd.logger.Info("dispatching metrics task", + "jobID", job.ID, + "monitorID", job.MonitorID, + "app", job.App, + "metricsCount", len(job.Metrics)) + + startTime := time.Now() + + // Get the next metrics to collect based on job priority and dependencies + metricsToCollect := job.GetNextCollectMetrics() + if len(metricsToCollect) == 0 { + cd.logger.Info("no metrics to collect", "jobID", job.ID) + return nil + } + + // Create collection context with timeout + collectCtx, collectCancel := context.WithTimeout(cd.ctx, cd.getCollectionTimeout(job)) + defer collectCancel() + + // Collect all metrics concurrently using channels (Go's way of handling concurrency) + resultChannels := make([]chan *jobtypes.CollectRepMetricsData, len(metricsToCollect)) + + for i, metrics := range metricsToCollect { + cd.logger.Info("starting metrics collection", + "jobID", job.ID, + "metricsName", metrics.Name, + "protocol", metrics.Protocol) + + // Start metrics collection in goroutine + resultChannels[i] = cd.metricsCollector.CollectMetrics(metrics, job, timeout) + } + + // Collect results from all metrics collection tasks + var results []*jobtypes.CollectRepMetricsData + var errors []error + + for i, resultChan := range resultChannels { + select { + case result := <-resultChan: + if result != nil { + results = append(results, result) + cd.logger.Info("received metrics result", + "jobID", job.ID, + "metricsName", metricsToCollect[i].Name, + "code", result.Code) + } + case <-collectCtx.Done(): + errors = append(errors, fmt.Errorf("timeout collecting metrics: %s", metricsToCollect[i].Name)) + cd.logger.Info("metrics collection timeout", + "jobID", job.ID, + "metricsName", metricsToCollect[i].Name) + } + } + + duration := time.Since(startTime) + + // Handle collection results + if err := cd.handleResults(results, job, errors); err != nil { + cd.logger.Error(err, "failed to handle collection results", + "jobID", job.ID, + "duration", duration) + return err + } + + cd.logger.Info("successfully dispatched metrics task", + "jobID", job.ID, + "resultsCount", len(results), + "errorsCount", len(errors), + "duration", duration) + + return nil +} + +// handleResults processes the collection results and decides on next actions +func (cd *CommonDispatcherImpl) handleResults(results []*jobtypes.CollectRepMetricsData, job *jobtypes.Job, errors []error) error { + cd.logger.Info("handling collection results", + "jobID", job.ID, + "resultsCount", len(results), + "errorsCount", len(errors)) + + // Process successful results + for _, result := range results { + if err := cd.resultHandler.HandleCollectData(result, job); err != nil { + cd.logger.Error(err, "failed to handle collect data", + "jobID", job.ID, + "metricsName", result.Metrics) + // Continue processing other results even if one fails + } + } + + // Log errors but don't fail the entire job + for _, err := range errors { + cd.logger.Error(err, "metrics collection error", "jobID", job.ID) + } + + // TODO: Implement logic to decide if we need to trigger next level metrics + // or next round scheduling based on results + cd.evaluateNextActions(job, results, errors) + + return nil +} + +// evaluateNextActions decides what to do next based on collection results +func (cd *CommonDispatcherImpl) evaluateNextActions(job *jobtypes.Job, results []*jobtypes.CollectRepMetricsData, errors []error) { + cd.logger.Info("evaluating next actions", + "jobID", job.ID, + "resultsCount", len(results), + "errorsCount", len(errors)) + + // Check if we have dependent metrics that need to be collected next + hasNextLevel := cd.hasNextLevelMetrics(job, results) + + if hasNextLevel { + cd.logger.Info("job has next level metrics to collect", "jobID", job.ID) + // TODO: Schedule next level metrics collection + } + + // For cyclic jobs, the rescheduling is handled by WheelTimerTask + if job.IsCyclic { + cd.logger.Info("cyclic job will be rescheduled by timer", "jobID", job.ID) + } + + // TODO: Send results to data queue for further processing + cd.sendToDataQueue(results, job) +} + +// hasNextLevelMetrics checks if there are dependent metrics that should be collected next +func (cd *CommonDispatcherImpl) hasNextLevelMetrics(job *jobtypes.Job, results []*jobtypes.CollectRepMetricsData) bool { + // This is a simplified check - in a full implementation, this would analyze + // metric dependencies and priorities to determine if more metrics need collection + return false +} + +// sendToDataQueue sends collection results to the data processing queue +func (cd *CommonDispatcherImpl) sendToDataQueue(results []*jobtypes.CollectRepMetricsData, job *jobtypes.Job) { + cd.logger.Info("sending results to data queue", + "jobID", job.ID, + "resultsCount", len(results)) + + // TODO: Implement queue sending logic + // For now, we'll just log that data should be sent + for _, result := range results { + cd.logger.Info("result ready for queue", + "jobID", job.ID, + "metricsName", result.Metrics, + "code", result.Code, + "valuesCount", len(result.Values)) + } +} + +// getCollectionTimeout calculates the timeout for metrics collection +func (cd *CommonDispatcherImpl) getCollectionTimeout(job *jobtypes.Job) time.Duration { + // Default timeout is 30 seconds + defaultTimeout := 30 * time.Second + + // If job has a specific timeout, use it + if job.DefaultInterval > 0 { + // Use 80% of the interval as timeout to allow for rescheduling + jobTimeout := time.Duration(job.DefaultInterval) * time.Second * 80 / 100 + if jobTimeout > defaultTimeout { + return jobTimeout + } + } + + return defaultTimeout +} + +// Stop stops the common dispatcher +func (cd *CommonDispatcherImpl) Stop() error { + cd.logger.Info("stopping common dispatcher") + cd.cancel() + return nil +} diff --git a/internal/collector/common/dispatcher/hashed_wheel_timer.go b/internal/collector/common/dispatcher/hashed_wheel_timer.go new file mode 100644 index 0000000..026fd2b --- /dev/null +++ b/internal/collector/common/dispatcher/hashed_wheel_timer.go @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package dispatcher + +import ( + "context" + "sync" + "sync/atomic" + "time" + + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// hashedWheelTimer implements a time wheel for efficient timeout management +type hashedWheelTimer struct { + logger logger.Logger + tickDuration time.Duration + wheelSize int + wheel []*bucket + currentTick int64 + startTime time.Time + ticker *time.Ticker + ctx context.Context + cancel context.CancelFunc + started int32 + stopped int32 + wg sync.WaitGroup +} + +// bucket represents a time wheel bucket containing timeouts +type bucket struct { + timeouts []*jobtypes.Timeout + mu sync.RWMutex +} + +// NewHashedWheelTimer creates a new hashed wheel timer +func NewHashedWheelTimer(wheelSize int, tickDuration time.Duration, logger logger.Logger) HashedWheelTimer { + if wheelSize <= 0 { + wheelSize = 512 + } + if tickDuration <= 0 { + tickDuration = 100 * time.Millisecond + } + + ctx, cancel := context.WithCancel(context.Background()) + + timer := &hashedWheelTimer{ + logger: logger.WithName("hashed-wheel-timer"), + tickDuration: tickDuration, + wheelSize: wheelSize, + wheel: make([]*bucket, wheelSize), + ctx: ctx, + cancel: cancel, + } + + // Initialize buckets + for i := 0; i < wheelSize; i++ { + timer.wheel[i] = &bucket{ + timeouts: make([]*jobtypes.Timeout, 0), + } + } + + return timer +} + +// NewTimeout creates a new timeout and adds it to the wheel +func (hwt *hashedWheelTimer) NewTimeout(task jobtypes.TimerTask, delay time.Duration) *jobtypes.Timeout { + if atomic.LoadInt32(&hwt.stopped) == 1 { + hwt.logger.Info("timer is stopped, cannot create new timeout") + return nil + } + + timeout := jobtypes.NewTimeout(task, delay) + + // Calculate which bucket and tick this timeout belongs to + totalTicks := int64(delay / hwt.tickDuration) + currentTick := atomic.LoadInt64(&hwt.currentTick) + targetTick := currentTick + totalTicks + + bucketIndex := int(targetTick % int64(hwt.wheelSize)) + timeout.SetWheelIndex(int(targetTick)) + timeout.SetBucketIndex(bucketIndex) + + // Add to appropriate bucket + bucket := hwt.wheel[bucketIndex] + bucket.mu.Lock() + bucket.timeouts = append(bucket.timeouts, timeout) + bucket.mu.Unlock() + + hwt.logger.Info("created timeout", + "delay", delay, + "bucketIndex", bucketIndex, + "targetTick", targetTick) + + return timeout +} + +// Start starts the timer wheel +func (hwt *hashedWheelTimer) Start() error { + if !atomic.CompareAndSwapInt32(&hwt.started, 0, 1) { + return nil // already started + } + + hwt.logger.Info("starting hashed wheel timer", + "wheelSize", hwt.wheelSize, + "tickDuration", hwt.tickDuration) + + hwt.startTime = time.Now() + hwt.ticker = time.NewTicker(hwt.tickDuration) + + hwt.wg.Add(1) + go hwt.run() + + hwt.logger.Info("hashed wheel timer started") + return nil +} + +// Stop stops the timer wheel +func (hwt *hashedWheelTimer) Stop() error { + if !atomic.CompareAndSwapInt32(&hwt.stopped, 0, 1) { + return nil // already stopped + } + + hwt.logger.Info("stopping hashed wheel timer") + + hwt.cancel() + + if hwt.ticker != nil { + hwt.ticker.Stop() + } + + hwt.wg.Wait() + + hwt.logger.Info("hashed wheel timer stopped") + return nil +} + +// run is the main timer loop +func (hwt *hashedWheelTimer) run() { + defer hwt.wg.Done() + + for { + select { + case <-hwt.ctx.Done(): + return + case <-hwt.ticker.C: + hwt.tick() + } + } +} + +// tick processes one tick of the wheel +func (hwt *hashedWheelTimer) tick() { + currentTick := atomic.AddInt64(&hwt.currentTick, 1) + bucketIndex := int(currentTick % int64(hwt.wheelSize)) + + bucket := hwt.wheel[bucketIndex] + bucket.mu.Lock() + + // Process expired timeouts + var remaining []*jobtypes.Timeout + for _, timeout := range bucket.timeouts { + if timeout.IsCancelled() { + continue // skip cancelled timeouts + } + + // Check if timeout has expired + if time.Now().After(timeout.Deadline()) { + // Execute the timeout task asynchronously + go func(t *jobtypes.Timeout) { + defer func() { + if r := recover(); r != nil { + hwt.logger.Error(nil, "panic in timeout task execution", "panic", r) + } + }() + + if err := t.Task().Run(t); err != nil { + hwt.logger.Error(err, "error executing timeout task") + } + }(timeout) + } else { + // Not yet expired, keep in bucket + remaining = append(remaining, timeout) + } + } + + bucket.timeouts = remaining + bucket.mu.Unlock() +} + +// GetStats returns timer statistics +func (hwt *hashedWheelTimer) GetStats() map[string]interface{} { + totalTimeouts := 0 + for _, bucket := range hwt.wheel { + bucket.mu.RLock() + totalTimeouts += len(bucket.timeouts) + bucket.mu.RUnlock() + } + + return map[string]interface{}{ + "wheelSize": hwt.wheelSize, + "tickDuration": hwt.tickDuration, + "currentTick": atomic.LoadInt64(&hwt.currentTick), + "totalTimeouts": totalTimeouts, + "started": atomic.LoadInt32(&hwt.started) == 1, + "stopped": atomic.LoadInt32(&hwt.stopped) == 1, + } +} diff --git a/internal/collector/common/dispatcher/time_wheel.go b/internal/collector/common/dispatcher/time_wheel.go new file mode 100644 index 0000000..2040ec0 --- /dev/null +++ b/internal/collector/common/dispatcher/time_wheel.go @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package dispatcher + +import ( + "context" + "fmt" + "sync" + "time" + + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// TimeDispatch manages time-based job scheduling using a hashed wheel timer +type TimeDispatch struct { + logger logger.Logger + wheelTimer HashedWheelTimer + commonDispatcher MetricsTaskDispatcher + cyclicTasks sync.Map // map[int64]*jobtypes.Timeout for cyclic jobs + tempTasks sync.Map // map[int64]*jobtypes.Timeout for one-time jobs + ctx context.Context + cancel context.CancelFunc + started bool + mu sync.RWMutex +} + +// MetricsTaskDispatcher interface for metrics task dispatching +type MetricsTaskDispatcher interface { + DispatchMetricsTask(job *jobtypes.Job, timeout *jobtypes.Timeout) error +} + +// HashedWheelTimer interface for time wheel operations +type HashedWheelTimer interface { + NewTimeout(task jobtypes.TimerTask, delay time.Duration) *jobtypes.Timeout + Start() error + Stop() error +} + +// NewTimeDispatch creates a new time dispatcher +func NewTimeDispatch(logger logger.Logger, commonDispatcher MetricsTaskDispatcher) *TimeDispatch { + ctx, cancel := context.WithCancel(context.Background()) + + td := &TimeDispatch{ + logger: logger.WithName("time-dispatch"), + commonDispatcher: commonDispatcher, + ctx: ctx, + cancel: cancel, + started: false, + } + + // Create hashed wheel timer with reasonable defaults + // 512 buckets, 100ms tick duration + td.wheelTimer = NewHashedWheelTimer(512, 100*time.Millisecond, logger) + + return td +} + +// AddJob adds a job to the time-based scheduler +// This corresponds to Java's TimerDispatcher.addJob method +func (td *TimeDispatch) AddJob(job *jobtypes.Job) error { + if job == nil { + return fmt.Errorf("job cannot be nil") + } + + td.logger.Info("adding job to time dispatcher", + "jobID", job.ID, + "isCyclic", job.IsCyclic, + "interval", job.DefaultInterval) + + // Create wheel timer task + timerTask := NewWheelTimerTask(job, td.commonDispatcher, td.logger) + + // Calculate delay + var delay time.Duration + if job.DefaultInterval > 0 { + delay = time.Duration(job.DefaultInterval) * time.Second + } else { + delay = 30 * time.Second // default interval + } + + // Create timeout + timeout := td.wheelTimer.NewTimeout(timerTask, delay) + + // Store timeout based on job type + if job.IsCyclic { + td.cyclicTasks.Store(job.ID, timeout) + td.logger.Info("added cyclic job to scheduler", "jobID", job.ID, "delay", delay) + } else { + td.tempTasks.Store(job.ID, timeout) + td.logger.Info("added one-time job to scheduler", "jobID", job.ID, "delay", delay) + } + + return nil +} + +// RemoveJob removes a job from the scheduler +func (td *TimeDispatch) RemoveJob(jobID int64) error { + td.logger.Info("removing job from time dispatcher", "jobID", jobID) + + // Try to remove from cyclic tasks + if timeoutInterface, exists := td.cyclicTasks.LoadAndDelete(jobID); exists { + if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok { + timeout.Cancel() + td.logger.Info("removed cyclic job", "jobID", jobID) + return nil + } + } + + // Try to remove from temp tasks + if timeoutInterface, exists := td.tempTasks.LoadAndDelete(jobID); exists { + if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok { + timeout.Cancel() + td.logger.Info("removed one-time job", "jobID", jobID) + return nil + } + } + + return fmt.Errorf("job not found: %d", jobID) +} + +// Start starts the time dispatcher +func (td *TimeDispatch) Start(ctx context.Context) error { + td.mu.Lock() + defer td.mu.Unlock() + + if td.started { + return fmt.Errorf("time dispatcher already started") + } + + td.logger.Info("starting time dispatcher") + + // Start the wheel timer + if err := td.wheelTimer.Start(); err != nil { + return fmt.Errorf("failed to start wheel timer: %w", err) + } + + td.started = true + td.logger.Info("time dispatcher started successfully") + return nil +} + +// Stop stops the time dispatcher +func (td *TimeDispatch) Stop() error { + td.mu.Lock() + defer td.mu.Unlock() + + if !td.started { + return nil + } + + td.logger.Info("stopping time dispatcher") + + // Cancel context + td.cancel() + + // Cancel all running tasks + td.cyclicTasks.Range(func(key, value interface{}) bool { + if timeout, ok := value.(*jobtypes.Timeout); ok { + timeout.Cancel() + } + return true + }) + + td.tempTasks.Range(func(key, value interface{}) bool { + if timeout, ok := value.(*jobtypes.Timeout); ok { + timeout.Cancel() + } + return true + }) + + // Clear maps + td.cyclicTasks = sync.Map{} + td.tempTasks = sync.Map{} + + // Stop wheel timer + if err := td.wheelTimer.Stop(); err != nil { + td.logger.Error(err, "error stopping wheel timer") + } + + td.started = false + td.logger.Info("time dispatcher stopped") + return nil +} + +// Stats returns dispatcher statistics +func (td *TimeDispatch) Stats() map[string]interface{} { + cyclicCount := 0 + tempCount := 0 + + td.cyclicTasks.Range(func(key, value interface{}) bool { + cyclicCount++ + return true + }) + + td.tempTasks.Range(func(key, value interface{}) bool { + tempCount++ + return true + }) + + return map[string]interface{}{ + "cyclicJobs": cyclicCount, + "tempJobs": tempCount, + "started": td.started, + } +} diff --git a/internal/collector/common/dispatcher/wheel_timer_task.go b/internal/collector/common/dispatcher/wheel_timer_task.go new file mode 100644 index 0000000..ebda8f2 --- /dev/null +++ b/internal/collector/common/dispatcher/wheel_timer_task.go @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package dispatcher + +import ( + "fmt" + "time" + + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// WheelTimerTask represents a task that runs in the time wheel +// This corresponds to Java's WheelTimerTask +type WheelTimerTask struct { + job *jobtypes.Job + commonDispatcher MetricsTaskDispatcher + logger logger.Logger +} + +// NewWheelTimerTask creates a new wheel timer task +func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher MetricsTaskDispatcher, logger logger.Logger) *WheelTimerTask { + return &WheelTimerTask{ + job: job, + commonDispatcher: commonDispatcher, + logger: logger.WithName("wheel-timer-task"), + } +} + +// Run executes the timer task +// This corresponds to Java's WheelTimerTask.run method +func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) error { + if wtt.job == nil { + wtt.logger.Error(nil, "job is nil, cannot execute") + return fmt.Errorf("job is nil, cannot execute") + } + + wtt.logger.Info("executing wheel timer task", + "jobID", wtt.job.ID, + "monitorID", wtt.job.MonitorID, + "app", wtt.job.App) + + startTime := time.Now() + + // Dispatch metrics task through common dispatcher + // This is where the job gets broken down into individual metric collection tasks + err := wtt.commonDispatcher.DispatchMetricsTask(wtt.job, timeout) + + duration := time.Since(startTime) + + if err != nil { + wtt.logger.Error(err, "failed to dispatch metrics task", + "jobID", wtt.job.ID, + "duration", duration) + return err + } + + wtt.logger.Info("successfully dispatched metrics task", + "jobID", wtt.job.ID, + "duration", duration) + + // If this is a cyclic job, reschedule it for the next execution + if wtt.job.IsCyclic && wtt.job.DefaultInterval > 0 { + wtt.rescheduleJob(timeout) + } + + return nil +} + +// rescheduleJob reschedules a cyclic job for the next execution +func (wtt *WheelTimerTask) rescheduleJob(timeout *jobtypes.Timeout) { + wtt.logger.Info("rescheduling cyclic job", + "jobID", wtt.job.ID, + "interval", wtt.job.DefaultInterval) + + // TODO: Implement rescheduling logic + // This would involve creating a new timeout with the same job + // and adding it back to the time wheel + // For now, we'll log that rescheduling is needed + wtt.logger.Info("cyclic job needs rescheduling", + "jobID", wtt.job.ID, + "nextExecutionIn", time.Duration(wtt.job.DefaultInterval)*time.Second) +} + +// GetJob returns the associated job +func (wtt *WheelTimerTask) GetJob() *jobtypes.Job { + return wtt.job +} diff --git a/internal/collector/common/job/job_server.go b/internal/collector/common/job/job_server.go index fdd7830..35cb195 100644 --- a/internal/collector/common/job/job_server.go +++ b/internal/collector/common/job/job_server.go @@ -17,51 +17,174 @@ * under the License. */ -package transport +package job import ( "context" + "fmt" + "sync" clrServer "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" ) +// TimeDispatcher interface defines time-based job scheduling +type TimeDispatcher interface { + AddJob(job *jobtypes.Job) error + RemoveJob(jobID int64) error + Start(ctx context.Context) error + Stop() error +} + +// Config represents job service configuration type Config struct { clrServer.Server } +// Runner implements the service runner interface +// It directly manages job scheduling through timeDispatch type Runner struct { Config + timeDispatch TimeDispatcher + mu sync.RWMutex + runningJobs map[int64]*jobtypes.Job + ctx context.Context + cancel context.CancelFunc } -func New(srv *Config) *Runner { +// AddAsyncCollectJob adds a job to async collection scheduling +// This is the main entry point that corresponds to Java's collectJobService.addAsyncCollectJob +func (r *Runner) AddAsyncCollectJob(job *jobtypes.Job) error { + if job == nil { + r.Logger.Error(nil, "job cannot be nil") + return fmt.Errorf("job cannot be nil") + } + + r.Logger.Info("adding async collect job", + "jobID", job.ID, + "monitorID", job.MonitorID, + "app", job.App, + "isCyclic", job.IsCyclic) + + r.mu.Lock() + defer r.mu.Unlock() + + // Store job in running jobs map + r.runningJobs[job.ID] = job + + // Add job to time dispatcher for scheduling + if err := r.timeDispatch.AddJob(job); err != nil { + delete(r.runningJobs, job.ID) + r.Logger.Error(err, "failed to add job to time dispatcher", "jobID", job.ID) + return fmt.Errorf("failed to add job to time dispatcher: %w", err) + } + r.Logger.Info("successfully added job to scheduler", "jobID", job.ID) + return nil +} + +// RemoveAsyncCollectJob removes a job from scheduling +func (r *Runner) RemoveAsyncCollectJob(jobID int64) error { + r.Logger.Info("removing async collect job", "jobID", jobID) + + r.mu.Lock() + defer r.mu.Unlock() + + // Remove from running jobs + delete(r.runningJobs, jobID) + + // Remove from time dispatcher + if err := r.timeDispatch.RemoveJob(jobID); err != nil { + r.Logger.Error(err, "failed to remove job from time dispatcher", "jobID", jobID) + return fmt.Errorf("failed to remove job from time dispatcher: %w", err) + } + + r.Logger.Info("successfully removed job from scheduler", "jobID", jobID) + return nil +} + +// RunningJobs returns a copy of currently running jobs +func (r *Runner) RunningJobs() map[int64]*jobtypes.Job { + r.mu.RLock() + defer r.mu.RUnlock() + + result := make(map[int64]*jobtypes.Job) + for id, job := range r.runningJobs { + result[id] = job + } + return result +} + +// New creates a new job service runner +func New(srv *Config) *Runner { + ctx, cancel := context.WithCancel(context.Background()) return &Runner{ - Config: *srv, + Config: *srv, + runningJobs: make(map[int64]*jobtypes.Job), + ctx: ctx, + cancel: cancel, } } -func (r *Runner) Start(ctx context.Context) error { +// SetTimeDispatcher sets the time dispatcher for this runner +// This should be called before starting the runner +func (r *Runner) SetTimeDispatcher(td TimeDispatcher) { + r.timeDispatch = td +} +// Start starts the job service runner +func (r *Runner) Start(ctx context.Context) error { r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", r.Info().Name) + r.Logger.Info("Starting job service runner") + + // Start the time dispatcher if it's set + if r.timeDispatch != nil { + if err := r.timeDispatch.Start(ctx); err != nil { + r.Logger.Error(err, "failed to start time dispatcher") + return fmt.Errorf("failed to start time dispatcher: %w", err) + } + } - r.Logger.Info("Starting job server") + r.Logger.Info("job service runner started successfully") select { case <-ctx.Done(): + r.Logger.Info("job service runner stopped by context") + if r.timeDispatch != nil { + if err := r.timeDispatch.Stop(); err != nil { + r.Logger.Error(err, "error stopping time dispatcher") + } + } return nil } } +// Info returns runner information func (r *Runner) Info() collector.Info { - return collector.Info{ - Name: "job", + Name: "job-service", } } +// Close closes the job service runner func (r *Runner) Close() error { + r.Logger.Info("closing job service runner") + + r.cancel() + + if r.timeDispatch != nil { + if err := r.timeDispatch.Stop(); err != nil { + r.Logger.Error(err, "error stopping time dispatcher") + return err + } + } + + // Clear running jobs + r.mu.Lock() + r.runningJobs = make(map[int64]*jobtypes.Job) + r.mu.Unlock() - r.Logger.Info("job close...") + r.Logger.Info("job service runner closed") return nil } diff --git a/internal/collector/common/types/job/timeout_types.go b/internal/collector/common/types/job/timeout_types.go index 82cd520..ab93c5a 100644 --- a/internal/collector/common/types/job/timeout_types.go +++ b/internal/collector/common/types/job/timeout_types.go @@ -79,22 +79,22 @@ func (t *Timeout) Context() context.Context { return t.ctx } -// SetWheelIndex sets the wheel index for internal use +// SetWheelIndex sets the wheel index for this timeout func (t *Timeout) SetWheelIndex(index int) { t.wheelIndex = index } -// GetWheelIndex gets the wheel index -func (t *Timeout) GetWheelIndex() int { - return t.wheelIndex -} - -// SetBucketIndex sets the bucket index for internal use +// SetBucketIndex sets the bucket index for this timeout func (t *Timeout) SetBucketIndex(index int) { t.bucketIndex = index } -// GetBucketIndex gets the bucket index -func (t *Timeout) GetBucketIndex() int { +// WheelIndex returns the wheel index +func (t *Timeout) WheelIndex() int { + return t.wheelIndex +} + +// BucketIndex returns the bucket index +func (t *Timeout) BucketIndex() int { return t.bucketIndex } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
