This is an automated email from the ASF dual-hosted git repository.
shown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to refs/heads/main by this push:
new db2aa0a add:Improve the go code specification based on yesterday's
opinions. (#14)
db2aa0a is described below
commit db2aa0a1eff630742023d96dd5523f5fe8285a29
Author: 铁甲小宝 <[email protected]>
AuthorDate: Tue Sep 9 23:12:10 2025 +0800
add:Improve the go code specification based on yesterday's opinions. (#14)
---
examples/main_simulation.go | 20 +++++++++
internal/cmd/server.go | 7 ---
internal/collector/basic/init.go | 18 ++++----
.../common/collect/dispatch/metrics_collector.go | 7 ++-
.../collector/common/collect/result_handler.go | 10 ++++-
.../common/dispatcher/common_dispatcher.go | 21 +++------
internal/collector/common/dispatcher/time_wheel.go | 15 ++-----
.../common/dispatcher/wheel_timer_task.go | 7 ++-
internal/collector/common/job/job_server.go | 50 +++++++++++++---------
9 files changed, 85 insertions(+), 70 deletions(-)
diff --git a/examples/main_simulation.go b/examples/main_simulation.go
new file mode 100644
index 0000000..e17ac7a
--- /dev/null
+++ b/examples/main_simulation.go
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package examples
diff --git a/internal/cmd/server.go b/internal/cmd/server.go
index 81a8bfe..c67fef8 100644
--- a/internal/cmd/server.go
+++ b/internal/cmd/server.go
@@ -30,7 +30,6 @@ import (
"github.com/spf13/cobra"
bannerouter
"hertzbeat.apache.org/hertzbeat-collector-go/internal/banner"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
jobserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job"
clrServer
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
transportserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport"
@@ -109,12 +108,6 @@ func server(ctx context.Context, logOut io.Writer) error {
}
func startRunners(ctx context.Context, cfg *clrServer.Server) error {
-
- // Initialize all collectors before starting runners
- // This ensures all protocol collectors are registered and ready
- cfg.Logger.Info("Initializing collectors...")
- basic.InitializeAllCollectors(cfg.Logger)
-
runners := []struct {
runner Runner[collectortypes.Info]
}{
diff --git a/internal/collector/basic/init.go b/internal/collector/basic/init.go
index 74c4aa6..f4a7504 100644
--- a/internal/collector/basic/init.go
+++ b/internal/collector/basic/init.go
@@ -25,16 +25,16 @@ import (
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
-// init 函数在包被导入时自动执行
-// 集中注册所有协议的工厂函数
+// init function is automatically executed when the package is imported
+// It centrally registers factory functions for all protocols
func init() {
- // 注册所有协议的工厂函数
- // 新增协议时只需要在这里添加一行
+ // Register factory functions for all protocols
+ // To add a new protocol, simply add a line here
strategy.RegisterFactory("jdbc", func(logger logger.Logger)
strategy.Collector {
return database.NewJDBCCollector(logger)
})
- // 未来可以在这里添加更多协议:
+ // More protocols can be added here in the future:
// strategy.RegisterFactory("http", func(logger logger.Logger)
strategy.Collector {
// return http.NewHTTPCollector(logger)
// })
@@ -43,13 +43,13 @@ func init() {
// })
}
-// InitializeAllCollectors 初始化所有已注册的采集器
-// 此时init()函数已经注册了所有工厂函数
-// 这个函数创建实际的采集器实例
+// InitializeAllCollectors initializes all registered collectors
+// At this point, the init() function has already registered all factory
functions
+// This function creates the actual collector instances
func InitializeAllCollectors(logger logger.Logger) {
logger.Info("initializing all collectors")
- // 使用已注册的工厂函数创建采集器实例
+ // Create collector instances using registered factory functions
strategy.InitializeCollectors(logger)
logger.Info("all collectors initialized successfully")
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go
b/internal/collector/common/collect/dispatch/metrics_collector.go
index b569858..85e7580 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -21,8 +21,11 @@ package dispatch
import (
"fmt"
+ "net/http"
"time"
+ // Import basic package with blank identifier to trigger its init()
function
+ // This ensures all collector factories are registered automatically
_ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
@@ -64,7 +67,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics
*jobtypes.Metrics, job *jobty
"protocol", metrics.Protocol,
"metricsName", metrics.Name)
- result := mc.createErrorResponse(metrics, job, 500,
fmt.Sprintf("Collector not found: %v", err))
+ result := mc.createErrorResponse(metrics, job,
http.StatusInternalServerError, fmt.Sprintf("Collector not found: %v", err))
resultChan <- result
return
}
@@ -98,7 +101,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics
*jobtypes.Metrics, job *jobty
duration := time.Since(startTime)
- if result != nil && result.Code == 200 {
+ if result != nil && result.Code == http.StatusOK {
mc.logger.Info("metrics collection completed
successfully",
"jobID", job.ID,
"metricsName", metrics.Name,
diff --git a/internal/collector/common/collect/result_handler.go
b/internal/collector/common/collect/result_handler.go
index 701ca63..7f5a304 100644
--- a/internal/collector/common/collect/result_handler.go
+++ b/internal/collector/common/collect/result_handler.go
@@ -21,6 +21,7 @@ package collect
import (
"fmt"
+ "net/http"
jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
@@ -32,8 +33,13 @@ type ResultHandlerImpl struct {
// TODO: Add data queue or storage interface when needed
}
+// ResultHandler interface for handling collection results
+type ResultHandler interface {
+ HandleCollectData(data *jobtypes.CollectRepMetricsData, job
*jobtypes.Job) error
+}
+
// NewResultHandler creates a new result handler
-func NewResultHandler(logger logger.Logger) *ResultHandlerImpl {
+func NewResultHandler(logger logger.Logger) ResultHandler {
return &ResultHandlerImpl{
logger: logger.WithName("result-handler"),
}
@@ -61,7 +67,7 @@ func (rh *ResultHandlerImpl) HandleCollectData(data
*jobtypes.CollectRepMetricsD
// 4. Triggering alerts based on thresholds
// 5. Updating monitoring status
- if data.Code == 200 {
+ if data.Code == http.StatusOK {
rh.logger.Info("successfully processed collect data",
"jobID", job.ID,
"metricsName", data.Metrics)
diff --git a/internal/collector/common/dispatcher/common_dispatcher.go
b/internal/collector/common/dispatcher/common_dispatcher.go
index d7a58c0..137304a 100644
--- a/internal/collector/common/dispatcher/common_dispatcher.go
+++ b/internal/collector/common/dispatcher/common_dispatcher.go
@@ -25,6 +25,7 @@ import (
"sync"
"time"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
@@ -34,9 +35,7 @@ import (
type CommonDispatcherImpl struct {
logger logger.Logger
metricsCollector MetricsCollector
- resultHandler ResultHandler
- ctx context.Context
- cancel context.CancelFunc
+ resultHandler collect.ResultHandler
mu sync.RWMutex
}
@@ -45,26 +44,17 @@ type MetricsCollector interface {
CollectMetrics(metrics *jobtypes.Metrics, job *jobtypes.Job, timeout
*jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData
}
-// ResultHandler interface for handling collection results
-type ResultHandler interface {
- HandleCollectData(data *jobtypes.CollectRepMetricsData, job
*jobtypes.Job) error
-}
-
// NewCommonDispatcher creates a new common dispatcher
-func NewCommonDispatcher(logger logger.Logger, metricsCollector
MetricsCollector, resultHandler ResultHandler) *CommonDispatcherImpl {
- ctx, cancel := context.WithCancel(context.Background())
-
+func NewCommonDispatcher(logger logger.Logger, metricsCollector
MetricsCollector, resultHandler collect.ResultHandler) *CommonDispatcherImpl {
return &CommonDispatcherImpl{
logger: logger.WithName("common-dispatcher"),
metricsCollector: metricsCollector,
resultHandler: resultHandler,
- ctx: ctx,
- cancel: cancel,
}
}
// DispatchMetricsTask dispatches a job by breaking it down into individual
metrics collection tasks
-func (cd *CommonDispatcherImpl) DispatchMetricsTask(job *jobtypes.Job, timeout
*jobtypes.Timeout) error {
+func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx context.Context, job
*jobtypes.Job, timeout *jobtypes.Timeout) error {
if job == nil {
return fmt.Errorf("job cannot be nil")
}
@@ -85,7 +75,7 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(job
*jobtypes.Job, timeout *
}
// Create collection context with timeout
- collectCtx, collectCancel := context.WithTimeout(cd.ctx,
cd.getCollectionTimeout(job))
+ collectCtx, collectCancel := context.WithTimeout(ctx,
cd.getCollectionTimeout(job))
defer collectCancel()
// Collect all metrics concurrently using channels (Go's way of
handling concurrency)
@@ -239,6 +229,5 @@ func (cd *CommonDispatcherImpl) getCollectionTimeout(job
*jobtypes.Job) time.Dur
// Stop stops the common dispatcher
func (cd *CommonDispatcherImpl) Stop() error {
cd.logger.Info("stopping common dispatcher")
- cd.cancel()
return nil
}
diff --git a/internal/collector/common/dispatcher/time_wheel.go
b/internal/collector/common/dispatcher/time_wheel.go
index 2040ec0..41b6860 100644
--- a/internal/collector/common/dispatcher/time_wheel.go
+++ b/internal/collector/common/dispatcher/time_wheel.go
@@ -36,15 +36,13 @@ type TimeDispatch struct {
commonDispatcher MetricsTaskDispatcher
cyclicTasks sync.Map // map[int64]*jobtypes.Timeout for cyclic jobs
tempTasks sync.Map // map[int64]*jobtypes.Timeout for one-time
jobs
- ctx context.Context
- cancel context.CancelFunc
started bool
mu sync.RWMutex
}
// MetricsTaskDispatcher interface for metrics task dispatching
type MetricsTaskDispatcher interface {
- DispatchMetricsTask(job *jobtypes.Job, timeout *jobtypes.Timeout) error
+ DispatchMetricsTask(ctx context.Context, job *jobtypes.Job, timeout
*jobtypes.Timeout) error
}
// HashedWheelTimer interface for time wheel operations
@@ -56,13 +54,9 @@ type HashedWheelTimer interface {
// NewTimeDispatch creates a new time dispatcher
func NewTimeDispatch(logger logger.Logger, commonDispatcher
MetricsTaskDispatcher) *TimeDispatch {
- ctx, cancel := context.WithCancel(context.Background())
-
td := &TimeDispatch{
logger: logger.WithName("time-dispatch"),
commonDispatcher: commonDispatcher,
- ctx: ctx,
- cancel: cancel,
started: false,
}
@@ -168,9 +162,6 @@ func (td *TimeDispatch) Stop() error {
td.logger.Info("stopping time dispatcher")
- // Cancel context
- td.cancel()
-
// Cancel all running tasks
td.cyclicTasks.Range(func(key, value interface{}) bool {
if timeout, ok := value.(*jobtypes.Timeout); ok {
@@ -201,7 +192,7 @@ func (td *TimeDispatch) Stop() error {
}
// Stats returns dispatcher statistics
-func (td *TimeDispatch) Stats() map[string]interface{} {
+func (td *TimeDispatch) Stats() map[string]any {
cyclicCount := 0
tempCount := 0
@@ -215,7 +206,7 @@ func (td *TimeDispatch) Stats() map[string]interface{} {
return true
})
- return map[string]interface{}{
+ return map[string]any{
"cyclicJobs": cyclicCount,
"tempJobs": tempCount,
"started": td.started,
diff --git a/internal/collector/common/dispatcher/wheel_timer_task.go
b/internal/collector/common/dispatcher/wheel_timer_task.go
index ebda8f2..3eb53e3 100644
--- a/internal/collector/common/dispatcher/wheel_timer_task.go
+++ b/internal/collector/common/dispatcher/wheel_timer_task.go
@@ -20,6 +20,7 @@
package dispatcher
import (
+ "context"
"fmt"
"time"
@@ -59,9 +60,13 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout)
error {
startTime := time.Now()
+ // Create a context for this specific task execution
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
// Dispatch metrics task through common dispatcher
// This is where the job gets broken down into individual metric
collection tasks
- err := wtt.commonDispatcher.DispatchMetricsTask(wtt.job, timeout)
+ err := wtt.commonDispatcher.DispatchMetricsTask(ctx, wtt.job, timeout)
duration := time.Since(startTime)
diff --git a/internal/collector/common/job/job_server.go
b/internal/collector/common/job/job_server.go
index 31eed6f..f2f3ba2 100644
--- a/internal/collector/common/job/job_server.go
+++ b/internal/collector/common/job/job_server.go
@@ -21,9 +21,11 @@ package job
import (
"context"
+ "errors"
"fmt"
"sync"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/dispatch"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/dispatcher"
@@ -43,23 +45,23 @@ type TimeDispatcher interface {
// Config represents job service configuration
type Config struct {
clrServer.Server
+ TimeDispatch TimeDispatcher
}
// Runner implements the service runner interface
type Runner struct {
Config
- timeDispatch TimeDispatcher
- mu sync.RWMutex
- runningJobs map[int64]*jobtypes.Job
- ctx context.Context
- cancel context.CancelFunc
+ mu sync.RWMutex
+ runningJobs map[int64]*jobtypes.Job
+ ctx context.Context
+ cancel context.CancelFunc
}
// AddAsyncCollectJob adds a job to async collection scheduling
func (r *Runner) AddAsyncCollectJob(job *jobtypes.Job) error {
if job == nil {
r.Logger.Error(nil, "job cannot be nil")
- return fmt.Errorf("job cannot be nil")
+ return errors.New("job cannot be nil")
}
r.Logger.Info("adding async collect job",
@@ -75,7 +77,7 @@ func (r *Runner) AddAsyncCollectJob(job *jobtypes.Job) error {
r.runningJobs[job.ID] = job
// Add job to time dispatcher for scheduling
- if err := r.timeDispatch.AddJob(job); err != nil {
+ if err := r.TimeDispatch.AddJob(job); err != nil {
delete(r.runningJobs, job.ID)
r.Logger.Error(err, "failed to add job to time dispatcher",
"jobID", job.ID)
return fmt.Errorf("failed to add job to time dispatcher: %w",
err)
@@ -96,7 +98,7 @@ func (r *Runner) RemoveAsyncCollectJob(jobID int64) error {
delete(r.runningJobs, jobID)
// Remove from time dispatcher
- if err := r.timeDispatch.RemoveJob(jobID); err != nil {
+ if err := r.TimeDispatch.RemoveJob(jobID); err != nil {
r.Logger.Error(err, "failed to remove job from time
dispatcher", "jobID", jobID)
return fmt.Errorf("failed to remove job from time dispatcher:
%w", err)
}
@@ -133,12 +135,14 @@ func New(srv *Config) *Runner {
// Create time dispatcher
timeDispatch := dispatcher.NewTimeDispatch(srv.Logger, commonDispatcher)
+ // Update the config with the time dispatcher
+ srv.TimeDispatch = timeDispatch
+
runner := &Runner{
- Config: *srv,
- timeDispatch: timeDispatch,
- runningJobs: make(map[int64]*jobtypes.Job),
- ctx: ctx,
- cancel: cancel,
+ Config: *srv,
+ runningJobs: make(map[int64]*jobtypes.Job),
+ ctx: ctx,
+ cancel: cancel,
}
return runner
@@ -149,14 +153,18 @@ func (r *Runner) Start(ctx context.Context) error {
r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner",
r.Info().Name)
r.Logger.Info("Starting job service runner")
+ // Initialize all collectors
+ r.Logger.Info("Initializing collectors...")
+ basic.InitializeAllCollectors(r.Logger)
+
// Start the time dispatcher
- if r.timeDispatch != nil {
- if err := r.timeDispatch.Start(ctx); err != nil {
+ if r.TimeDispatch != nil {
+ if err := r.TimeDispatch.Start(ctx); err != nil {
r.Logger.Error(err, "failed to start time dispatcher")
return fmt.Errorf("failed to start time dispatcher:
%w", err)
}
} else {
- return fmt.Errorf("time dispatcher is not initialized")
+ return errors.New("time dispatcher is not initialized")
}
r.Logger.Info("job service runner started successfully")
@@ -164,8 +172,8 @@ func (r *Runner) Start(ctx context.Context) error {
select {
case <-ctx.Done():
r.Logger.Info("job service runner stopped by context")
- if r.timeDispatch != nil {
- if err := r.timeDispatch.Stop(); err != nil {
+ if r.TimeDispatch != nil {
+ if err := r.TimeDispatch.Stop(); err != nil {
r.Logger.Error(err, "error stopping time
dispatcher")
}
}
@@ -187,8 +195,8 @@ func (r *Runner) Close() error {
r.cancel()
// Stop time dispatcher
- if r.timeDispatch != nil {
- if err := r.timeDispatch.Stop(); err != nil {
+ if r.TimeDispatch != nil {
+ if err := r.TimeDispatch.Stop(); err != nil {
r.Logger.Error(err, "error stopping time dispatcher")
// Continue cleanup despite error
}
@@ -199,8 +207,8 @@ func (r *Runner) Close() error {
// Clear running jobs
r.mu.Lock()
+ defer r.mu.Unlock()
r.runningJobs = make(map[int64]*jobtypes.Job)
- r.mu.Unlock()
r.Logger.Info("job service runner closed")
return nil
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]