This is an automated email from the ASF dual-hosted git repository.
liuhongyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to refs/heads/main by this push:
new 2c1de38 feat(core): transport scheduler integration (#19)
2c1de38 is described below
commit 2c1de38d6a5cb615473aace7485b8ac38289421e
Author: 铁甲小宝 <[email protected]>
AuthorDate: Sat Sep 27 19:21:06 2025 +0800
feat(core): transport scheduler integration (#19)
* add:The first version of the modified schedule
* 1、fix the scheduling accuracy of critical time wheels and the
rescheduling of cyclic tasks
2. Supports JavA-compatible AES encryption and dynamic key management
3. Solve the problem of concurrent map access through one-time password
preprocessing
4. Add comprehensive operation statistics and monitoring functions
5. manager message reception
* modified:
1、Use the key when deleting the test.
2、Reset the order of magnitude of the request time.
---
Dockerfile | 63 ++++
Makefile | 11 +-
.../docker-compose.yml => docker-compose.yml | 4 +-
internal/cmd/server.go | 19 +-
.../collector/basic/database/jdbc_collector.go | 105 +++---
.../common/collect/dispatch/metrics_collector.go | 10 +-
.../collector/common/collect/result_handler.go | 9 +-
.../common/collect/strategy/strategy_factory.go | 9 -
.../common/dispatcher/common_dispatcher.go | 67 +++-
.../common/dispatcher/hashed_wheel_timer.go | 41 ++-
internal/collector/common/dispatcher/time_wheel.go | 321 ++++++++++++++++-
.../common/dispatcher/wheel_timer_task.go | 57 ++-
internal/collector/common/router/message_router.go | 269 ++++++++++++++
internal/collector/common/transport/transport.go | 28 +-
internal/collector/common/types/job/job_types.go | 9 +-
.../collector/common/types/job/metrics_types.go | 111 ++++--
internal/transport/netty_client.go | 8 +-
internal/transport/processors.go | 265 ++++++++++++--
internal/util/crypto/aes_util.go | 194 ++++++++++
internal/util/param/param_replacer.go | 391 +++++++++++++++++++++
tools/docker/hcg/Dockerfile | 35 --
tools/make/golang.mk | 22 +-
tools/make/image.mk | 58 ---
23 files changed, 1810 insertions(+), 296 deletions(-)
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..74c38a7
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+FROM golang:1.23-alpine AS golang-builder
+
+ARG GOPROXY
+# ENV GOPROXY ${GOPROXY:-direct}
+# ENV GOPROXY=https://proxy.golang.com.cn,direct
+
+ENV GOPATH /go
+ENV GOROOT /usr/local/go
+ENV PACKAGE hertzbeat.apache.org/hertzbeat-collector-go
+ENV BUILD_DIR /app
+
+COPY . ${BUILD_DIR}
+WORKDIR ${BUILD_DIR}
+RUN apk --no-cache add build-base git bash
+
+RUN make init && \
+ make fmt && \
+ make go-lint &&\
+ make build
+
+RUN chmod +x bin/collector
+
+FROM alpine
+
+ARG TIMEZONE
+ENV TIMEZONE=${TIMEZONE:-"Asia/Shanghai"}
+
+RUN apk update \
+ && apk --no-cache add \
+ bash \
+ ca-certificates \
+ curl \
+ dumb-init \
+ gettext \
+ openssh \
+ sqlite \
+ gnupg \
+ tzdata \
+ && ln -sf /usr/share/zoneinfo/${TIMEZONE} /etc/localtime \
+ && echo "${TIMEZONE}" > /etc/timezone
+
+COPY --from=golang-builder /app/bin/collector /usr/local/bin/collector
+COPY --from=golang-builder /app/etc/hertzbeat-collector.yml
/etc/hertzbeat-collector.yml
+
+EXPOSE 8090
+ENTRYPOINT ["collector", "server", "--config", "/etc/hertzbeat-collector.yml"]
diff --git a/Makefile b/Makefile
index 6b33134..cd02530 100644
--- a/Makefile
+++ b/Makefile
@@ -17,12 +17,11 @@
_run:
@$(MAKE) --warn-undefined-variables \
- -f tools/make/common.mk \
- -f tools/make/golang.mk \
- -f tools/make/linter.mk \
- -f tools/make/image.mk \
- -f tools/make/tools.mk \
- $(MAKECMDGOALS)
+ -f tools/make/common.mk \
+ -f tools/make/golang.mk \
+ -f tools/make/linter.mk \
+ -f tools/make/tools.mk \
+ $(MAKECMDGOALS)
.PHONY: _run
diff --git a/tools/docker/docker-compose/docker-compose.yml b/docker-compose.yml
similarity index 93%
rename from tools/docker/docker-compose/docker-compose.yml
rename to docker-compose.yml
index 427b692..7c158b5 100644
--- a/tools/docker/docker-compose/docker-compose.yml
+++ b/docker-compose.yml
@@ -19,11 +19,11 @@ services:
# collector go service
hertzbeat-collector:
- image: hertzbeat-collector-go:latest
+ image: hertzbeat/hertzbeat-collector-go:latest
container_name: hertzbeat-collector-go
restart: on-failure
ports:
- - "8080:8080"
+ - "8090:8090"
networks:
hcg-network:
diff --git a/internal/cmd/server.go b/internal/cmd/server.go
index 5ff4c45..1e98ceb 100644
--- a/internal/cmd/server.go
+++ b/internal/cmd/server.go
@@ -108,17 +108,20 @@ func server(ctx context.Context, logOut io.Writer) error {
}
func startRunners(ctx context.Context, cfg *clrserver.Server) error {
+ // Create job server first
+ jobRunner := jobserver.New(&jobserver.Config{
+ Server: *cfg,
+ })
+
+ // Create transport server and connect it to job server
+ transportRunner := transportserver.NewFromConfig(cfg.Config)
+ transportRunner.SetJobScheduler(jobRunner) // Connect transport to job
scheduler
+
runners := []struct {
runner Runner[collectortypes.Info]
}{
- {
- jobserver.New(&jobserver.Config{
- Server: *cfg,
- }),
- },
- {
- transportserver.NewFromConfig(cfg.Config),
- },
+ {jobRunner},
+ {transportRunner},
// todo; add metrics
}
diff --git a/internal/collector/basic/database/jdbc_collector.go
b/internal/collector/basic/database/jdbc_collector.go
index 01fa7f2..dd74b2f 100644
--- a/internal/collector/basic/database/jdbc_collector.go
+++ b/internal/collector/basic/database/jdbc_collector.go
@@ -30,6 +30,7 @@ import (
_ "github.com/microsoft/go-mssqldb"
jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
)
const (
@@ -67,6 +68,13 @@ func NewJDBCCollector(logger logger.Logger) *JDBCCollector {
}
}
+// extractJDBCConfig extracts JDBC configuration from interface{} type
+// This function uses the parameter replacer for consistent configuration
extraction
+func extractJDBCConfig(jdbcInterface interface{}) (*jobtypes.JDBCProtocol,
error) {
+ replacer := param.NewReplacer()
+ return replacer.ExtractJDBCConfig(jdbcInterface)
+}
+
// PreCheck validates the JDBC metrics configuration
func (jc *JDBCCollector) PreCheck(metrics *jobtypes.Metrics) error {
if metrics == nil {
@@ -77,44 +85,51 @@ func (jc *JDBCCollector) PreCheck(metrics
*jobtypes.Metrics) error {
return fmt.Errorf("JDBC protocol configuration is required")
}
- jdbc := metrics.JDBC
+ // Extract JDBC configuration
+ jdbcConfig, err := extractJDBCConfig(metrics.JDBC)
+ if err != nil {
+ return fmt.Errorf("invalid JDBC configuration: %w", err)
+ }
+ if jdbcConfig == nil {
+ return fmt.Errorf("JDBC configuration is required")
+ }
// Validate required fields when URL is not provided
- if jdbc.URL == "" {
- if jdbc.Host == "" {
+ if jdbcConfig.URL == "" {
+ if jdbcConfig.Host == "" {
return fmt.Errorf("host is required when URL is not
provided")
}
- if jdbc.Port == "" {
+ if jdbcConfig.Port == "" {
return fmt.Errorf("port is required when URL is not
provided")
}
- if jdbc.Platform == "" {
+ if jdbcConfig.Platform == "" {
return fmt.Errorf("platform is required when URL is not
provided")
}
}
// Validate platform
- if jdbc.Platform != "" {
- switch jdbc.Platform {
+ if jdbcConfig.Platform != "" {
+ switch jdbcConfig.Platform {
case PlatformMySQL, PlatformMariaDB, PlatformPostgreSQL,
PlatformSQLServer, PlatformOracle:
// Valid platforms
default:
- return fmt.Errorf("unsupported database platform: %s",
jdbc.Platform)
+ return fmt.Errorf("unsupported database platform: %s",
jdbcConfig.Platform)
}
}
// Validate query type
- if jdbc.QueryType != "" {
- switch jdbc.QueryType {
+ if jdbcConfig.QueryType != "" {
+ switch jdbcConfig.QueryType {
case QueryTypeOneRow, QueryTypeMultiRow, QueryTypeColumns,
QueryTypeRunScript:
// Valid query types
default:
- return fmt.Errorf("unsupported query type: %s",
jdbc.QueryType)
+ return fmt.Errorf("unsupported query type: %s",
jdbcConfig.QueryType)
}
}
// Validate SQL for most query types
- if jdbc.QueryType != QueryTypeRunScript && jdbc.SQL == "" {
- return fmt.Errorf("SQL is required for query type: %s",
jdbc.QueryType)
+ if jdbcConfig.QueryType != QueryTypeRunScript && jdbcConfig.SQL == "" {
+ return fmt.Errorf("SQL is required for query type: %s",
jdbcConfig.QueryType)
}
return nil
@@ -123,27 +138,35 @@ func (jc *JDBCCollector) PreCheck(metrics
*jobtypes.Metrics) error {
// Collect performs JDBC metrics collection
func (jc *JDBCCollector) Collect(metrics *jobtypes.Metrics)
*jobtypes.CollectRepMetricsData {
startTime := time.Now()
- jdbc := metrics.JDBC
- jc.logger.Info("starting JDBC collection",
- "host", jdbc.Host,
- "port", jdbc.Port,
- "platform", jdbc.Platform,
- "database", jdbc.Database,
- "queryType", jdbc.QueryType)
+ // Extract JDBC configuration
+ jdbcConfig, err := extractJDBCConfig(metrics.JDBC)
+ if err != nil {
+ jc.logger.Error(err, "failed to extract JDBC config")
+ return jc.createFailResponse(metrics, CodeFail,
fmt.Sprintf("JDBC config error: %v", err))
+ }
+ if jdbcConfig == nil {
+ return jc.createFailResponse(metrics, CodeFail, "JDBC
configuration is required")
+ }
+
+ // Debug level only for collection start
+ jc.logger.V(1).Info("starting JDBC collection",
+ "host", jdbcConfig.Host,
+ "platform", jdbcConfig.Platform,
+ "queryType", jdbcConfig.QueryType)
// Get timeout
- timeout := jc.getTimeout(jdbc.Timeout)
+ timeout := jc.getTimeout(jdbcConfig.Timeout)
// Get database URL
- databaseURL, err := jc.constructDatabaseURL(jdbc)
+ databaseURL, err := jc.constructDatabaseURL(jdbcConfig)
if err != nil {
jc.logger.Error(err, "failed to construct database URL")
return jc.createFailResponse(metrics, CodeFail,
fmt.Sprintf("Database URL error: %v", err))
}
// Create database connection
- db, err := jc.getConnection(databaseURL, jdbc.Username, jdbc.Password,
timeout)
+ db, err := jc.getConnection(databaseURL, jdbcConfig.Username,
jdbcConfig.Password, timeout)
if err != nil {
jc.logger.Error(err, "failed to connect to database")
return jc.createFailResponse(metrics, CodeUnConnectable,
fmt.Sprintf("Connection error: %v", err))
@@ -153,26 +176,27 @@ func (jc *JDBCCollector) Collect(metrics
*jobtypes.Metrics) *jobtypes.CollectRep
// Execute query based on type with context
response := jc.createSuccessResponse(metrics)
- switch jdbc.QueryType {
+ switch jdbcConfig.QueryType {
case QueryTypeOneRow:
- err = jc.queryOneRow(db, jdbc.SQL, metrics.Aliasfields,
response)
+ err = jc.queryOneRow(db, jdbcConfig.SQL, metrics.Aliasfields,
response)
case QueryTypeMultiRow:
- err = jc.queryMultiRow(db, jdbc.SQL, metrics.Aliasfields,
response)
+ err = jc.queryMultiRow(db, jdbcConfig.SQL, metrics.Aliasfields,
response)
case QueryTypeColumns:
- err = jc.queryColumns(db, jdbc.SQL, metrics.Aliasfields,
response)
+ err = jc.queryColumns(db, jdbcConfig.SQL, metrics.Aliasfields,
response)
case QueryTypeRunScript:
- err = jc.runScript(db, jdbc.SQL, response)
+ err = jc.runScript(db, jdbcConfig.SQL, response)
default:
- err = fmt.Errorf("unsupported query type: %s", jdbc.QueryType)
+ err = fmt.Errorf("unsupported query type: %s",
jdbcConfig.QueryType)
}
if err != nil {
- jc.logger.Error(err, "query execution failed", "queryType",
jdbc.QueryType)
+ jc.logger.Error(err, "query execution failed", "queryType",
jdbcConfig.QueryType)
return jc.createFailResponse(metrics, CodeFail,
fmt.Sprintf("Query error: %v", err))
}
duration := time.Since(startTime)
- jc.logger.Info("JDBC collection completed",
+ // Debug level only for successful completion
+ jc.logger.V(1).Info("JDBC collection completed",
"duration", duration,
"rowCount", len(response.Values))
@@ -195,9 +219,8 @@ func (jc *JDBCCollector) getTimeout(timeoutStr string)
time.Duration {
return duration
}
- // If it's a pure number, treat it as seconds (more reasonable for
database connections)
if timeout, err := strconv.Atoi(timeoutStr); err == nil {
- return time.Duration(timeout) * time.Second
+ return time.Duration(timeout) * time.Millisecond
}
return 30 * time.Second // fallback to default
@@ -205,10 +228,6 @@ func (jc *JDBCCollector) getTimeout(timeoutStr string)
time.Duration {
// constructDatabaseURL constructs the database connection URL
func (jc *JDBCCollector) constructDatabaseURL(jdbc *jobtypes.JDBCProtocol)
(string, error) {
- // If URL is provided directly, use it
- if jdbc.URL != "" {
- return jdbc.URL, nil
- }
// Construct URL based on platform
host := jdbc.Host
@@ -217,7 +236,8 @@ func (jc *JDBCCollector) constructDatabaseURL(jdbc
*jobtypes.JDBCProtocol) (stri
switch jdbc.Platform {
case PlatformMySQL, PlatformMariaDB:
- return
fmt.Sprintf("mysql://%s:%s@tcp(%s:%s)/%s?parseTime=true&charset=utf8mb4",
+ // MySQL DSN format:
[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN]
+ return
fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?parseTime=true&charset=utf8mb4",
jdbc.Username, jdbc.Password, host, port, database), nil
case PlatformPostgreSQL:
return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable",
@@ -234,14 +254,13 @@ func (jc *JDBCCollector) constructDatabaseURL(jdbc
*jobtypes.JDBCProtocol) (stri
func (jc *JDBCCollector) getConnection(databaseURL, username, password string,
timeout time.Duration) (*sql.DB, error) {
// Extract driver name from URL
var driverName string
- if strings.HasPrefix(databaseURL, "mysql://") {
- driverName = "mysql"
- // Remove mysql:// prefix for the driver
- databaseURL = strings.TrimPrefix(databaseURL, "mysql://")
- } else if strings.HasPrefix(databaseURL, "postgres://") {
+ if strings.HasPrefix(databaseURL, "postgres://") {
driverName = "postgres"
} else if strings.HasPrefix(databaseURL, "sqlserver://") {
driverName = "sqlserver"
+ } else if strings.Contains(databaseURL, "@tcp(") {
+ // MySQL DSN format (no protocol prefix)
+ driverName = "mysql"
} else {
return nil, fmt.Errorf("unsupported database URL format: %s",
databaseURL)
}
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go
b/internal/collector/common/collect/dispatch/metrics_collector.go
index 85e7580..bc3ea2e 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -53,8 +53,8 @@ func (mc *MetricsCollector) CollectMetrics(metrics
*jobtypes.Metrics, job *jobty
go func() {
defer close(resultChan)
- mc.logger.Info("starting metrics collection",
- "jobID", job.ID,
+ // Debug level only for collection start
+ mc.logger.V(1).Info("starting metrics collection",
"metricsName", metrics.Name,
"protocol", metrics.Protocol)
@@ -72,7 +72,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics
*jobtypes.Metrics, job *jobty
return
}
- // Perform the actual collection
+ // Perform the actual collection with metrics (parameters
should already be replaced by CommonDispatcher)
result := collector.Collect(metrics)
// Enrich result with job information
@@ -101,9 +101,9 @@ func (mc *MetricsCollector) CollectMetrics(metrics
*jobtypes.Metrics, job *jobty
duration := time.Since(startTime)
+ // Only log failures at INFO level, success at debug level
if result != nil && result.Code == http.StatusOK {
- mc.logger.Info("metrics collection completed
successfully",
- "jobID", job.ID,
+ mc.logger.V(1).Info("metrics collection completed
successfully",
"metricsName", metrics.Name,
"protocol", metrics.Protocol,
"duration", duration,
diff --git a/internal/collector/common/collect/result_handler.go
b/internal/collector/common/collect/result_handler.go
index 7f5a304..22ceb1f 100644
--- a/internal/collector/common/collect/result_handler.go
+++ b/internal/collector/common/collect/result_handler.go
@@ -52,9 +52,8 @@ func (rh *ResultHandlerImpl) HandleCollectData(data
*jobtypes.CollectRepMetricsD
return fmt.Errorf("collect data is nil")
}
- rh.logger.Info("handling collect data",
- "jobID", job.ID,
- "monitorID", job.MonitorID,
+ // Debug level only for data handling
+ rh.logger.V(1).Info("handling collect data",
"metricsName", data.Metrics,
"code", data.Code,
"valuesCount", len(data.Values))
@@ -67,9 +66,9 @@ func (rh *ResultHandlerImpl) HandleCollectData(data
*jobtypes.CollectRepMetricsD
// 4. Triggering alerts based on thresholds
// 5. Updating monitoring status
+ // Only log failures at INFO level, success at debug level
if data.Code == http.StatusOK {
- rh.logger.Info("successfully processed collect data",
- "jobID", job.ID,
+ rh.logger.V(1).Info("successfully processed collect data",
"metricsName", data.Metrics)
} else {
rh.logger.Info("received failed collect data",
diff --git a/internal/collector/common/collect/strategy/strategy_factory.go
b/internal/collector/common/collect/strategy/strategy_factory.go
index fa951b0..9fc9af1 100644
--- a/internal/collector/common/collect/strategy/strategy_factory.go
+++ b/internal/collector/common/collect/strategy/strategy_factory.go
@@ -76,15 +76,6 @@ func InitializeCollectors(logger logger.Logger) {
}
globalRegistry.initialized = true
-
- // Log initialization results
- protocols := make([]string, 0, len(globalRegistry.collectors))
- for protocol := range globalRegistry.collectors {
- protocols = append(protocols, protocol)
- }
- logger.Info("collectors initialized from factories",
- "protocols", protocols,
- "count", len(protocols))
}
// Register a collector directly (legacy method, kept for compatibility)
diff --git a/internal/collector/common/dispatcher/common_dispatcher.go
b/internal/collector/common/dispatcher/common_dispatcher.go
index 137304a..8d37be5 100644
--- a/internal/collector/common/dispatcher/common_dispatcher.go
+++ b/internal/collector/common/dispatcher/common_dispatcher.go
@@ -28,6 +28,7 @@ import (
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
)
// CommonDispatcherImpl is responsible for breaking down jobs into individual
metrics collection tasks
@@ -59,9 +60,9 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx
context.Context, job *jo
return fmt.Errorf("job cannot be nil")
}
- cd.logger.Info("dispatching metrics task",
+ // Dispatching metrics task - only log at debug level
+ cd.logger.V(1).Info("dispatching metrics task",
"jobID", job.ID,
- "monitorID", job.MonitorID,
"app", job.App,
"metricsCount", len(job.Metrics))
@@ -74,6 +75,16 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx
context.Context, job *jo
return nil
}
+ // Replace parameters in job configuration ONCE before concurrent
collection
+ // This avoids concurrent map access issues in MetricsCollector
+ paramReplacer := param.NewReplacer()
+ processedJob, err := paramReplacer.ReplaceJobParams(job)
+ if err != nil {
+ cd.logger.Error(err, "failed to replace job parameters",
+ "jobID", job.ID)
+ return fmt.Errorf("parameter replacement failed: %w", err)
+ }
+
// Create collection context with timeout
collectCtx, collectCancel := context.WithTimeout(ctx,
cd.getCollectionTimeout(job))
defer collectCancel()
@@ -82,13 +93,26 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx
context.Context, job *jo
resultChannels := make([]chan *jobtypes.CollectRepMetricsData,
len(metricsToCollect))
for i, metrics := range metricsToCollect {
- cd.logger.Info("starting metrics collection",
- "jobID", job.ID,
+ // Starting metrics collection - debug level only
+ cd.logger.V(1).Info("starting metrics collection",
"metricsName", metrics.Name,
"protocol", metrics.Protocol)
- // Start metrics collection in goroutine
- resultChannels[i] = cd.metricsCollector.CollectMetrics(metrics,
job, timeout)
+ // Find the processed metrics in the processed job
+ var processedMetrics *jobtypes.Metrics
+ for j := range processedJob.Metrics {
+ if processedJob.Metrics[j].Name == metrics.Name {
+ processedMetrics = &processedJob.Metrics[j]
+ break
+ }
+ }
+ if processedMetrics == nil {
+ cd.logger.Error(nil, "processed metrics not found",
"metricsName", metrics.Name)
+ continue
+ }
+
+ // Start metrics collection in goroutine with processed data
+ resultChannels[i] =
cd.metricsCollector.CollectMetrics(processedMetrics, processedJob, timeout)
}
// Collect results from all metrics collection tasks
@@ -100,8 +124,8 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx
context.Context, job *jo
case result := <-resultChan:
if result != nil {
results = append(results, result)
- cd.logger.Info("received metrics result",
- "jobID", job.ID,
+ // Metrics result received - debug level only
+ cd.logger.V(1).Info("received metrics result",
"metricsName", metricsToCollect[i].Name,
"code", result.Code)
}
@@ -123,18 +147,26 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx
context.Context, job *jo
return err
}
- cd.logger.Info("successfully dispatched metrics task",
- "jobID", job.ID,
- "resultsCount", len(results),
- "errorsCount", len(errors),
- "duration", duration)
+ // Only log summary for successful tasks if there were errors
+ if len(errors) > 0 {
+ cd.logger.Info("metrics task completed with errors",
+ "jobID", job.ID,
+ "errorsCount", len(errors),
+ "duration", duration)
+ } else {
+ cd.logger.V(1).Info("metrics task completed successfully",
+ "jobID", job.ID,
+ "resultsCount", len(results),
+ "duration", duration)
+ }
return nil
}
// handleResults processes the collection results and decides on next actions
func (cd *CommonDispatcherImpl) handleResults(results
[]*jobtypes.CollectRepMetricsData, job *jobtypes.Job, errors []error) error {
- cd.logger.Info("handling collection results",
+ // Debug level only for result handling
+ cd.logger.V(1).Info("handling collection results",
"jobID", job.ID,
"resultsCount", len(results),
"errorsCount", len(errors))
@@ -163,7 +195,8 @@ func (cd *CommonDispatcherImpl) handleResults(results
[]*jobtypes.CollectRepMetr
// evaluateNextActions decides what to do next based on collection results
func (cd *CommonDispatcherImpl) evaluateNextActions(job *jobtypes.Job, results
[]*jobtypes.CollectRepMetricsData, errors []error) {
- cd.logger.Info("evaluating next actions",
+ // Debug level only for next actions evaluation
+ cd.logger.V(1).Info("evaluating next actions",
"jobID", job.ID,
"resultsCount", len(results),
"errorsCount", len(errors))
@@ -172,13 +205,13 @@ func (cd *CommonDispatcherImpl) evaluateNextActions(job
*jobtypes.Job, results [
hasNextLevel := cd.hasNextLevelMetrics(job, results)
if hasNextLevel {
- cd.logger.Info("job has next level metrics to collect",
"jobID", job.ID)
+ cd.logger.V(1).Info("job has next level metrics to collect",
"jobID", job.ID)
// TODO: Schedule next level metrics collection
}
// For cyclic jobs, the rescheduling is handled by WheelTimerTask
if job.IsCyclic {
- cd.logger.Info("cyclic job will be rescheduled by timer",
"jobID", job.ID)
+ cd.logger.V(1).Info("cyclic job will be rescheduled by timer",
"jobID", job.ID)
}
// TODO: Send results to data queue for further processing
diff --git a/internal/collector/common/dispatcher/hashed_wheel_timer.go
b/internal/collector/common/dispatcher/hashed_wheel_timer.go
index 3ebe931..9100205 100644
--- a/internal/collector/common/dispatcher/hashed_wheel_timer.go
+++ b/internal/collector/common/dispatcher/hashed_wheel_timer.go
@@ -100,7 +100,8 @@ func (hwt *hashedWheelTimer) NewTimeout(task
jobtypes.TimerTask, delay time.Dura
bucket.timeouts = append(bucket.timeouts, timeout)
bucket.mu.Unlock()
- hwt.logger.Info("created timeout",
+ // Log timeout creation only for debugging
+ hwt.logger.V(1).Info("created timeout",
"delay", delay,
"bucketIndex", bucketIndex,
"targetTick", targetTick)
@@ -114,9 +115,7 @@ func (hwt *hashedWheelTimer) Start(ctx context.Context)
error {
return nil // already started
}
- hwt.logger.Info("starting hashed wheel timer",
- "wheelSize", hwt.wheelSize,
- "tickDuration", hwt.tickDuration)
+ hwt.logger.Info("starting hashed wheel timer")
hwt.startTime = time.Now()
hwt.ticker = time.NewTicker(hwt.tickDuration)
@@ -138,8 +137,6 @@ func (hwt *hashedWheelTimer) Stop() error {
return nil // already stopped
}
- hwt.logger.Info("stopping hashed wheel timer")
-
hwt.cancel()
if hwt.ticker != nil {
@@ -174,6 +171,14 @@ func (hwt *hashedWheelTimer) tick() {
bucket := hwt.wheel[bucketIndex]
bucket.mu.Lock()
+ // Only log tick processing if there are timeouts to process
+ if len(bucket.timeouts) > 0 {
+ hwt.logger.V(1).Info("processing tick",
+ "currentTick", currentTick,
+ "bucketIndex", bucketIndex,
+ "timeoutsCount", len(bucket.timeouts))
+ }
+
// Process expired timeouts
var remaining []*jobtypes.Timeout
for _, timeout := range bucket.timeouts {
@@ -181,8 +186,18 @@ func (hwt *hashedWheelTimer) tick() {
continue // skip cancelled timeouts
}
- // Check if timeout has expired
- if time.Now().After(timeout.Deadline()) {
+ now := time.Now()
+ deadline := timeout.Deadline()
+
+ // Check if timeout should execute based on wheel position
+ // A timeout should execute when currentTick >= its targetTick
+ wheelIndex := timeout.WheelIndex()
+ if currentTick >= int64(wheelIndex) {
+ // Only log actual task execution at INFO level
+ hwt.logger.Info("executing scheduled task",
+ "currentTick", currentTick,
+ "targetTick", wheelIndex)
+
// Execute the timeout task asynchronously
go func(t *jobtypes.Timeout) {
defer func() {
@@ -196,7 +211,12 @@ func (hwt *hashedWheelTimer) tick() {
}
}(timeout)
} else {
- // Not yet expired, keep in bucket
+ // Not yet time to execute, keep in bucket
+ hwt.logger.V(1).Info("timeout not ready by wheel
position",
+ "currentTick", currentTick,
+ "targetTick", wheelIndex,
+ "deadline", deadline,
+ "now", now)
remaining = append(remaining, timeout)
}
}
@@ -216,10 +236,11 @@ func (hwt *hashedWheelTimer) GetStats()
map[string]interface{} {
return map[string]interface{}{
"wheelSize": hwt.wheelSize,
- "tickDuration": hwt.tickDuration,
+ "tickDuration": hwt.tickDuration.String(),
"currentTick": atomic.LoadInt64(&hwt.currentTick),
"totalTimeouts": totalTimeouts,
"started": atomic.LoadInt32(&hwt.started) == 1,
"stopped": atomic.LoadInt32(&hwt.stopped) == 1,
+ "wheelPeriod": (time.Duration(hwt.wheelSize) *
hwt.tickDuration).String(),
}
}
diff --git a/internal/collector/common/dispatcher/time_wheel.go
b/internal/collector/common/dispatcher/time_wheel.go
index 30d02ad..d8c7aa1 100644
--- a/internal/collector/common/dispatcher/time_wheel.go
+++ b/internal/collector/common/dispatcher/time_wheel.go
@@ -29,6 +29,16 @@ import (
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
+// JobInfo holds information about a scheduled job
+type JobInfo struct {
+ Job *jobtypes.Job
+ Timeout *jobtypes.Timeout
+ CreatedAt time.Time
+ LastExecutedAt time.Time
+ NextExecutionAt time.Time
+ ExecutionCount int64
+}
+
// TimeDispatch manages time-based job scheduling using a hashed wheel timer
type TimeDispatch struct {
logger logger.Logger
@@ -36,8 +46,26 @@ type TimeDispatch struct {
commonDispatcher MetricsTaskDispatcher
cyclicTasks sync.Map // map[int64]*jobtypes.Timeout for cyclic jobs
tempTasks sync.Map // map[int64]*jobtypes.Timeout for one-time
jobs
+ jobInfos sync.Map // map[int64]*JobInfo for detailed job
information
started bool
mu sync.RWMutex
+
+ // Statistics
+ stats *DispatcherStats
+ statsLogger *time.Ticker
+ statsCancel context.CancelFunc
+}
+
+// DispatcherStats holds dispatcher statistics
+type DispatcherStats struct {
+ StartTime time.Time
+ TotalJobsAdded int64
+ TotalJobsRemoved int64
+ TotalJobsExecuted int64
+ TotalJobsCompleted int64
+ TotalJobsFailed int64
+ LastExecutionTime time.Time
+ mu sync.RWMutex
}
// MetricsTaskDispatcher interface for metrics task dispatching
@@ -50,6 +78,7 @@ type HashedWheelTimer interface {
NewTimeout(task jobtypes.TimerTask, delay time.Duration)
*jobtypes.Timeout
Start(ctx context.Context) error
Stop() error
+ GetStats() map[string]interface{}
}
// NewTimeDispatch creates a new time dispatcher
@@ -58,6 +87,9 @@ func NewTimeDispatch(logger logger.Logger, commonDispatcher
MetricsTaskDispatche
logger: logger.WithName("time-dispatch"),
commonDispatcher: commonDispatcher,
started: false,
+ stats: &DispatcherStats{
+ StartTime: time.Now(),
+ },
}
// Create hashed wheel timer with reasonable defaults
@@ -74,32 +106,56 @@ func (td *TimeDispatch) AddJob(job *jobtypes.Job) error {
return fmt.Errorf("job cannot be nil")
}
- td.logger.Info("adding job to time dispatcher",
+ // Log job addition at debug level
+ td.logger.V(1).Info("adding job",
"jobID", job.ID,
- "isCyclic", job.IsCyclic,
"interval", job.DefaultInterval)
+ // Update statistics
+ td.stats.mu.Lock()
+ td.stats.TotalJobsAdded++
+ td.stats.mu.Unlock()
+
// Create wheel timer task
- timerTask := NewWheelTimerTask(job, td.commonDispatcher, td.logger)
+ timerTask := NewWheelTimerTask(job, td.commonDispatcher, td, td.logger)
// Calculate delay
var delay time.Duration
if job.DefaultInterval > 0 {
- delay = time.Duration(job.DefaultInterval) * time.Second
+ delay = time.Duration(job.DefaultInterval) * time.Millisecond
+ td.logger.V(1).Info("calculated delay",
+ "interval", job.DefaultInterval,
+ "delay", delay)
} else {
- delay = 30 * time.Second // default interval
+ delay = 10 * time.Second
}
// Create timeout
timeout := td.wheelTimer.NewTimeout(timerTask, delay)
- // Store timeout based on job type
+ // Create job info
+ now := time.Now()
+ jobInfo := &JobInfo{
+ Job: job,
+ Timeout: timeout,
+ CreatedAt: now,
+ NextExecutionAt: now.Add(delay),
+ ExecutionCount: 0,
+ }
+
+ // Store timeout and job info based on job type
if job.IsCyclic {
td.cyclicTasks.Store(job.ID, timeout)
- td.logger.Info("added cyclic job to scheduler", "jobID",
job.ID, "delay", delay)
+ td.jobInfos.Store(job.ID, jobInfo)
+ td.logger.Info("scheduled cyclic job",
+ "jobID", job.ID,
+ "nextExecution", jobInfo.NextExecutionAt)
} else {
td.tempTasks.Store(job.ID, timeout)
- td.logger.Info("added one-time job to scheduler", "jobID",
job.ID, "delay", delay)
+ td.jobInfos.Store(job.ID, jobInfo)
+ td.logger.Info("scheduled one-time job",
+ "jobID", job.ID,
+ "nextExecution", jobInfo.NextExecutionAt)
}
return nil
@@ -107,13 +163,19 @@ func (td *TimeDispatch) AddJob(job *jobtypes.Job) error {
// RemoveJob removes a job from the scheduler
func (td *TimeDispatch) RemoveJob(jobID int64) error {
- td.logger.Info("removing job from time dispatcher", "jobID", jobID)
+ td.logger.V(1).Info("removing job", "jobID", jobID)
+
+ // Update statistics
+ td.stats.mu.Lock()
+ td.stats.TotalJobsRemoved++
+ td.stats.mu.Unlock()
// Try to remove from cyclic tasks
if timeoutInterface, exists := td.cyclicTasks.LoadAndDelete(jobID);
exists {
if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok {
timeout.Cancel()
- td.logger.Info("removed cyclic job", "jobID", jobID)
+ td.jobInfos.Delete(jobID)
+ td.logger.V(1).Info("removed cyclic job", "jobID",
jobID)
return nil
}
}
@@ -122,7 +184,8 @@ func (td *TimeDispatch) RemoveJob(jobID int64) error {
if timeoutInterface, exists := td.tempTasks.LoadAndDelete(jobID);
exists {
if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok {
timeout.Cancel()
- td.logger.Info("removed one-time job", "jobID", jobID)
+ td.jobInfos.Delete(jobID)
+ td.logger.V(1).Info("removed one-time job", "jobID",
jobID)
return nil
}
}
@@ -146,8 +209,11 @@ func (td *TimeDispatch) Start(ctx context.Context) error {
return fmt.Errorf("failed to start wheel timer: %w", err)
}
+ // Start periodic statistics logging
+ td.startStatsLogging(ctx)
+
td.started = true
- td.logger.Info("time dispatcher started successfully")
+ // Started successfully
return nil
}
@@ -162,6 +228,9 @@ func (td *TimeDispatch) Stop() error {
td.logger.Info("stopping time dispatcher")
+ // Stop statistics logging
+ td.stopStatsLogging()
+
// Cancel all running tasks
td.cyclicTasks.Range(func(key, value interface{}) bool {
if timeout, ok := value.(*jobtypes.Timeout); ok {
@@ -180,6 +249,7 @@ func (td *TimeDispatch) Stop() error {
// Clear maps
td.cyclicTasks = sync.Map{}
td.tempTasks = sync.Map{}
+ td.jobInfos = sync.Map{}
// Stop wheel timer
if err := td.wheelTimer.Stop(); err != nil {
@@ -206,9 +276,230 @@ func (td *TimeDispatch) Stats() map[string]any {
return true
})
+ td.stats.mu.RLock()
+ uptime := time.Since(td.stats.StartTime)
+ totalJobsAdded := td.stats.TotalJobsAdded
+ totalJobsRemoved := td.stats.TotalJobsRemoved
+ totalJobsExecuted := td.stats.TotalJobsExecuted
+ totalJobsCompleted := td.stats.TotalJobsCompleted
+ totalJobsFailed := td.stats.TotalJobsFailed
+ lastExecutionTime := td.stats.LastExecutionTime
+ td.stats.mu.RUnlock()
+
return map[string]any{
- "cyclicJobs": cyclicCount,
- "tempJobs": tempCount,
- "started": td.started,
+ "cyclicJobs": cyclicCount,
+ "tempJobs": tempCount,
+ "started": td.started,
+ "uptime": uptime,
+ "totalJobsAdded": totalJobsAdded,
+ "totalJobsRemoved": totalJobsRemoved,
+ "totalJobsExecuted": totalJobsExecuted,
+ "totalJobsCompleted": totalJobsCompleted,
+ "totalJobsFailed": totalJobsFailed,
+ "lastExecutionTime": lastExecutionTime,
+ }
+}
+
+// RecordJobExecution records job execution statistics
+func (td *TimeDispatch) RecordJobExecution() {
+ td.stats.mu.Lock()
+ td.stats.TotalJobsExecuted++
+ td.stats.LastExecutionTime = time.Now()
+ td.stats.mu.Unlock()
+}
+
+// RecordJobCompleted records job completion
+func (td *TimeDispatch) RecordJobCompleted() {
+ td.stats.mu.Lock()
+ td.stats.TotalJobsCompleted++
+ td.stats.mu.Unlock()
+}
+
+// RecordJobFailed records job failure
+func (td *TimeDispatch) RecordJobFailed() {
+ td.stats.mu.Lock()
+ td.stats.TotalJobsFailed++
+ td.stats.mu.Unlock()
+}
+
+// UpdateJobExecution updates job execution information
+func (td *TimeDispatch) UpdateJobExecution(jobID int64) {
+ if jobInfoInterface, exists := td.jobInfos.Load(jobID); exists {
+ if jobInfo, ok := jobInfoInterface.(*JobInfo); ok {
+ now := time.Now()
+ jobInfo.LastExecutedAt = now
+ jobInfo.ExecutionCount++
+
+ // Update next execution time for cyclic jobs
+ if jobInfo.Job.IsCyclic && jobInfo.Job.DefaultInterval
> 0 {
+ jobInfo.NextExecutionAt =
now.Add(time.Duration(jobInfo.Job.DefaultInterval) * time.Second)
+ }
+ }
}
}
+
+// RescheduleJob reschedules a cyclic job for the next execution
+func (td *TimeDispatch) RescheduleJob(job *jobtypes.Job) error {
+ if !job.IsCyclic || job.DefaultInterval <= 0 {
+ return fmt.Errorf("job is not cyclable or has invalid interval")
+ }
+
+ // Create new timer task for next execution
+ nextDelay := time.Duration(job.DefaultInterval) * time.Second
+ timerTask := NewWheelTimerTask(job, td.commonDispatcher, td, td.logger)
+
+ // Schedule the next execution
+ timeout := td.wheelTimer.NewTimeout(timerTask, nextDelay)
+
+ // Update the timeout in cyclicTasks
+ td.cyclicTasks.Store(job.ID, timeout)
+
+ // Update job info next execution time
+ if jobInfoInterface, exists := td.jobInfos.Load(job.ID); exists {
+ if jobInfo, ok := jobInfoInterface.(*JobInfo); ok {
+ jobInfo.NextExecutionAt = time.Now().Add(nextDelay)
+ jobInfo.Timeout = timeout
+ }
+ }
+
+ nextExecutionTime := time.Now().Add(nextDelay)
+ td.logger.Info("rescheduled cyclic job",
+ "jobID", job.ID,
+ "interval", job.DefaultInterval,
+ "nextExecution", nextExecutionTime,
+ "delay", nextDelay)
+
+ return nil
+}
+
+// startStatsLogging starts periodic statistics logging
+func (td *TimeDispatch) startStatsLogging(ctx context.Context) {
+ // Create a context that can be cancelled
+ statsCtx, cancel := context.WithCancel(ctx)
+ td.statsCancel = cancel
+
+ // Create ticker for 1-minute intervals
+ td.statsLogger = time.NewTicker(1 * time.Minute)
+
+ go func() {
+ defer td.statsLogger.Stop()
+
+ // Log initial statistics
+ td.logStatistics()
+
+ for {
+ select {
+ case <-td.statsLogger.C:
+ td.logStatistics()
+ case <-statsCtx.Done():
+ td.logger.Info("statistics logging stopped")
+ return
+ }
+ }
+ }()
+
+ td.logger.Info("statistics logging started", "interval", "1m")
+}
+
+// stopStatsLogging stops periodic statistics logging
+func (td *TimeDispatch) stopStatsLogging() {
+ if td.statsCancel != nil {
+ td.statsCancel()
+ td.statsCancel = nil
+ }
+ if td.statsLogger != nil {
+ td.statsLogger.Stop()
+ td.statsLogger = nil
+ }
+}
+
+// logStatistics logs current dispatcher statistics
+func (td *TimeDispatch) logStatistics() {
+ stats := td.Stats()
+
+ // Get detailed job information
+ var cyclicJobDetails []map[string]any
+ var tempJobDetails []map[string]any
+
+ td.cyclicTasks.Range(func(key, value interface{}) bool {
+ if jobID, ok := key.(int64); ok {
+ if jobInfoInterface, exists := td.jobInfos.Load(jobID);
exists {
+ if jobInfo, ok := jobInfoInterface.(*JobInfo);
ok {
+ now := time.Now()
+ detail := map[string]any{
+ "jobID": jobID,
+ "app":
jobInfo.Job.App,
+ "monitorID":
jobInfo.Job.MonitorID,
+ "interval":
jobInfo.Job.DefaultInterval,
+ "createdAt":
jobInfo.CreatedAt,
+ "lastExecuted":
jobInfo.LastExecutedAt,
+ "nextExecution":
jobInfo.NextExecutionAt,
+ "execCount":
jobInfo.ExecutionCount,
+ "timeSinceLastExec": func()
string {
+ if
jobInfo.LastExecutedAt.IsZero() {
+ return "never"
+ }
+ return
now.Sub(jobInfo.LastExecutedAt).String()
+ }(),
+ "timeToNextExec": func() string
{
+ if
jobInfo.NextExecutionAt.IsZero() {
+ return "unknown"
+ }
+ if
jobInfo.NextExecutionAt.Before(now) {
+ return "overdue"
+ }
+ return
jobInfo.NextExecutionAt.Sub(now).String()
+ }(),
+ }
+ cyclicJobDetails =
append(cyclicJobDetails, detail)
+ }
+ }
+ }
+ return true
+ })
+
+ td.tempTasks.Range(func(key, value interface{}) bool {
+ if jobID, ok := key.(int64); ok {
+ if jobInfoInterface, exists := td.jobInfos.Load(jobID);
exists {
+ if jobInfo, ok := jobInfoInterface.(*JobInfo);
ok {
+ detail := map[string]any{
+ "jobID": jobID,
+ "app":
jobInfo.Job.App,
+ "monitorID":
jobInfo.Job.MonitorID,
+ "createdAt":
jobInfo.CreatedAt,
+ "nextExecution":
jobInfo.NextExecutionAt,
+ "execCount":
jobInfo.ExecutionCount,
+ }
+ tempJobDetails = append(tempJobDetails,
detail)
+ }
+ }
+ }
+ return true
+ })
+
+ // Get timer wheel statistics
+ var timerStats map[string]interface{}
+ if td.wheelTimer != nil {
+ timerStats = td.wheelTimer.GetStats()
+ }
+
+ td.logger.Info("📊 Dispatcher Statistics Report",
+ "uptime", stats["uptime"],
+ "activeJobs", map[string]any{
+ "cyclic": stats["cyclicJobs"],
+ "temp": stats["tempJobs"],
+ "total": stats["cyclicJobs"].(int) +
stats["tempJobs"].(int),
+ },
+ "jobCounters", map[string]any{
+ "added": stats["totalJobsAdded"],
+ "removed": stats["totalJobsRemoved"],
+ "executed": stats["totalJobsExecuted"],
+ "completed": stats["totalJobsCompleted"],
+ "failed": stats["totalJobsFailed"],
+ },
+ "timerWheelConfig", timerStats,
+ "lastExecution", stats["lastExecutionTime"],
+ "cyclicJobs", cyclicJobDetails,
+ "tempJobs", tempJobDetails,
+ )
+}
diff --git a/internal/collector/common/dispatcher/wheel_timer_task.go
b/internal/collector/common/dispatcher/wheel_timer_task.go
index 3eb53e3..657103a 100644
--- a/internal/collector/common/dispatcher/wheel_timer_task.go
+++ b/internal/collector/common/dispatcher/wheel_timer_task.go
@@ -28,19 +28,30 @@ import (
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
+// StatsRecorder interface for recording job execution statistics
+type StatsRecorder interface {
+ RecordJobExecution()
+ RecordJobCompleted()
+ RecordJobFailed()
+ UpdateJobExecution(jobID int64)
+ RescheduleJob(job *jobtypes.Job) error
+}
+
// WheelTimerTask represents a task that runs in the time wheel
// This corresponds to Java's WheelTimerTask
type WheelTimerTask struct {
job *jobtypes.Job
commonDispatcher MetricsTaskDispatcher
+ statsRecorder StatsRecorder
logger logger.Logger
}
// NewWheelTimerTask creates a new wheel timer task
-func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher
MetricsTaskDispatcher, logger logger.Logger) *WheelTimerTask {
+func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher
MetricsTaskDispatcher, statsRecorder StatsRecorder, logger logger.Logger)
*WheelTimerTask {
return &WheelTimerTask{
job: job,
commonDispatcher: commonDispatcher,
+ statsRecorder: statsRecorder,
logger: logger.WithName("wheel-timer-task"),
}
}
@@ -53,11 +64,16 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout)
error {
return fmt.Errorf("job is nil, cannot execute")
}
- wtt.logger.Info("executing wheel timer task",
+ wtt.logger.V(1).Info("executing task",
"jobID", wtt.job.ID,
- "monitorID", wtt.job.MonitorID,
"app", wtt.job.App)
+ // Record job execution start
+ if wtt.statsRecorder != nil {
+ wtt.statsRecorder.RecordJobExecution()
+ wtt.statsRecorder.UpdateJobExecution(wtt.job.ID)
+ }
+
startTime := time.Now()
// Create a context for this specific task execution
@@ -74,13 +90,23 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout)
error {
wtt.logger.Error(err, "failed to dispatch metrics task",
"jobID", wtt.job.ID,
"duration", duration)
+
+ // Record job failure
+ if wtt.statsRecorder != nil {
+ wtt.statsRecorder.RecordJobFailed()
+ }
return err
}
- wtt.logger.Info("successfully dispatched metrics task",
+ wtt.logger.V(1).Info("task completed",
"jobID", wtt.job.ID,
"duration", duration)
+ // Record job completion
+ if wtt.statsRecorder != nil {
+ wtt.statsRecorder.RecordJobCompleted()
+ }
+
// If this is a cyclic job, reschedule it for the next execution
if wtt.job.IsCyclic && wtt.job.DefaultInterval > 0 {
wtt.rescheduleJob(timeout)
@@ -91,17 +117,20 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout)
error {
// rescheduleJob reschedules a cyclic job for the next execution
func (wtt *WheelTimerTask) rescheduleJob(timeout *jobtypes.Timeout) {
- wtt.logger.Info("rescheduling cyclic job",
- "jobID", wtt.job.ID,
- "interval", wtt.job.DefaultInterval)
+ if wtt.job.DefaultInterval <= 0 {
+ wtt.logger.Info("job has no valid interval, not rescheduling",
+ "jobID", wtt.job.ID)
+ return
+ }
- // TODO: Implement rescheduling logic
- // This would involve creating a new timeout with the same job
- // and adding it back to the time wheel
- // For now, we'll log that rescheduling is needed
- wtt.logger.Info("cyclic job needs rescheduling",
- "jobID", wtt.job.ID,
- "nextExecutionIn",
time.Duration(wtt.job.DefaultInterval)*time.Second)
+ // Use the stats recorder to reschedule the job
+ if wtt.statsRecorder != nil {
+ if err := wtt.statsRecorder.RescheduleJob(wtt.job); err != nil {
+ wtt.logger.Error(err, "failed to reschedule job",
"jobID", wtt.job.ID)
+ }
+ } else {
+ wtt.logger.Error(nil, "stats recorder is nil", "jobID",
wtt.job.ID)
+ }
}
// GetJob returns the associated job
diff --git a/internal/collector/common/router/message_router.go
b/internal/collector/common/router/message_router.go
new file mode 100644
index 0000000..262dff7
--- /dev/null
+++ b/internal/collector/common/router/message_router.go
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package router
+
+import (
+ "encoding/json"
+ "fmt"
+
+ pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// MessageType constants matching Java version
+const (
+ MessageTypeHeartbeat int32 = 0
+ MessageTypeGoOnline int32 = 1
+ MessageTypeGoOffline int32 = 2
+ MessageTypeGoClose int32 = 3
+ MessageTypeIssueCyclicTask int32 = 4
+ MessageTypeDeleteCyclicTask int32 = 5
+ MessageTypeIssueOneTimeTask int32 = 6
+ MessageTypeResponseCyclicTaskData int32 = 7
+ MessageTypeResponseOneTimeTaskData int32 = 8
+ MessageTypeResponseCyclicTaskSdData int32 = 9
+)
+
+// MessageRouter is the interface for routing messages between transport and
job components
+type MessageRouter interface {
+ // RegisterProcessors registers all message processors
+ RegisterProcessors()
+
+ // SendResult sends collection results back to the manager
+ SendResult(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job)
error
+
+ // GetIdentity returns the collector identity
+ GetIdentity() string
+}
+
+// MessageRouterImpl implements the MessageRouter interface
+type MessageRouterImpl struct {
+ logger logger.Logger
+ client transport.TransportClient
+ jobRunner *job.Runner
+ identity string
+}
+
+// Config represents message router configuration
+type Config struct {
+ Logger logger.Logger
+ Client transport.TransportClient
+ JobRunner *job.Runner
+ Identity string
+}
+
+// New creates a new message router
+func New(cfg *Config) MessageRouter {
+ return &MessageRouterImpl{
+ logger: cfg.Logger.WithName("message-router"),
+ client: cfg.Client,
+ jobRunner: cfg.JobRunner,
+ identity: cfg.Identity,
+ }
+}
+
+// RegisterProcessors registers all message processors
+func (r *MessageRouterImpl) RegisterProcessors() {
+ if r.client == nil {
+ r.logger.Error(nil, "cannot register processors: client is nil")
+ return
+ }
+
+ // Register cyclic task processor
+ r.client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg
interface{}) (interface{}, error) {
+ if pbMsg, ok := msg.(*pb.Message); ok {
+ return r.handleIssueCyclicTask(pbMsg)
+ }
+ return nil, fmt.Errorf("invalid message type")
+ })
+
+ // Register delete cyclic task processor
+ r.client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg
interface{}) (interface{}, error) {
+ if pbMsg, ok := msg.(*pb.Message); ok {
+ return r.handleDeleteCyclicTask(pbMsg)
+ }
+ return nil, fmt.Errorf("invalid message type")
+ })
+
+ // Register one-time task processor
+ r.client.RegisterProcessor(MessageTypeIssueOneTimeTask, func(msg
interface{}) (interface{}, error) {
+ if pbMsg, ok := msg.(*pb.Message); ok {
+ return r.handleIssueOneTimeTask(pbMsg)
+ }
+ return nil, fmt.Errorf("invalid message type")
+ })
+
+ // Other processors can be registered here
+ r.logger.Info("Successfully registered all message processors")
+}
+
+// handleIssueCyclicTask handles cyclic task messages
+func (r *MessageRouterImpl) handleIssueCyclicTask(msg *pb.Message)
(*pb.Message, error) {
+ r.logger.Info("Handling cyclic task message")
+
+ // Parse job from message
+ var job jobtypes.Job
+ if err := json.Unmarshal(msg.Msg, &job); err != nil {
+ r.logger.Error(err, "failed to unmarshal job")
+ return &pb.Message{
+ Type: pb.MessageType_ISSUE_CYCLIC_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: r.identity,
+ Msg: []byte("failed to unmarshal job"),
+ }, nil
+ }
+
+ // Add job to job runner
+ if err := r.jobRunner.AddAsyncCollectJob(&job); err != nil {
+ r.logger.Error(err, "failed to add job to job runner", "jobID",
job.ID)
+ return &pb.Message{
+ Type: pb.MessageType_ISSUE_CYCLIC_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: r.identity,
+ Msg: []byte(fmt.Sprintf("failed to add job: %v",
err)),
+ }, nil
+ }
+
+ return &pb.Message{
+ Type: pb.MessageType_ISSUE_CYCLIC_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: r.identity,
+ Msg: []byte("job added successfully"),
+ }, nil
+}
+
+// handleDeleteCyclicTask handles delete cyclic task messages
+func (r *MessageRouterImpl) handleDeleteCyclicTask(msg *pb.Message)
(*pb.Message, error) {
+ r.logger.Info("Handling delete cyclic task message")
+
+ // Parse job IDs from message
+ var jobIDs []int64
+ if err := json.Unmarshal(msg.Msg, &jobIDs); err != nil {
+ r.logger.Error(err, "failed to unmarshal job IDs")
+ return &pb.Message{
+ Type: pb.MessageType_DELETE_CYCLIC_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: r.identity,
+ Msg: []byte("failed to unmarshal job IDs"),
+ }, nil
+ }
+
+ // Remove jobs from job runner
+ for _, jobID := range jobIDs {
+ if err := r.jobRunner.RemoveAsyncCollectJob(jobID); err != nil {
+ r.logger.Error(err, "failed to remove job from job
runner", "jobID", jobID)
+ // Continue with other jobs even if one fails
+ }
+ }
+
+ return &pb.Message{
+ Type: pb.MessageType_DELETE_CYCLIC_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: r.identity,
+ Msg: []byte("jobs removed successfully"),
+ }, nil
+}
+
+// handleIssueOneTimeTask handles one-time task messages
+func (r *MessageRouterImpl) handleIssueOneTimeTask(msg *pb.Message)
(*pb.Message, error) {
+ r.logger.Info("Handling one-time task message")
+
+ // Parse job from message
+ var job jobtypes.Job
+ if err := json.Unmarshal(msg.Msg, &job); err != nil {
+ r.logger.Error(err, "failed to unmarshal job")
+ return &pb.Message{
+ Type: pb.MessageType_ISSUE_ONE_TIME_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: r.identity,
+ Msg: []byte("failed to unmarshal job"),
+ }, nil
+ }
+
+ // Set job as non-cyclic
+ job.IsCyclic = false
+
+ // Add job to job runner
+ if err := r.jobRunner.AddAsyncCollectJob(&job); err != nil {
+ r.logger.Error(err, "failed to add one-time job to job runner",
"jobID", job.ID)
+ return &pb.Message{
+ Type: pb.MessageType_ISSUE_ONE_TIME_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: r.identity,
+ Msg: []byte(fmt.Sprintf("failed to add one-time
job: %v", err)),
+ }, nil
+ }
+
+ return &pb.Message{
+ Type: pb.MessageType_ISSUE_ONE_TIME_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: r.identity,
+ Msg: []byte("one-time job added successfully"),
+ }, nil
+}
+
+// SendResult sends collection results back to the manager
+func (r *MessageRouterImpl) SendResult(data *jobtypes.CollectRepMetricsData,
job *jobtypes.Job) error {
+ if r.client == nil || !r.client.IsStarted() {
+ return fmt.Errorf("transport client not started")
+ }
+
+ // Determine message type based on job type
+ var msgType pb.MessageType
+ if job.IsCyclic {
+ msgType = pb.MessageType_RESPONSE_CYCLIC_TASK_DATA
+ } else {
+ msgType = pb.MessageType_RESPONSE_ONE_TIME_TASK_DATA
+ }
+
+ // Serialize metrics data
+ dataBytes, err := json.Marshal([]jobtypes.CollectRepMetricsData{*data})
+ if err != nil {
+ return fmt.Errorf("failed to marshal metrics data: %w", err)
+ }
+
+ // Create message
+ msg := &pb.Message{
+ Type: msgType,
+ Direction: pb.Direction_REQUEST,
+ Identity: r.identity,
+ Msg: dataBytes,
+ }
+
+ // Send message
+ if err := r.client.SendMsg(msg); err != nil {
+ return fmt.Errorf("failed to send metrics data: %w", err)
+ }
+
+ r.logger.Info("Successfully sent metrics data",
+ "jobID", job.ID,
+ "metricsName", data.Metrics,
+ "isCyclic", job.IsCyclic)
+
+ return nil
+}
+
+// GetIdentity returns the collector identity
+func (r *MessageRouterImpl) GetIdentity() string {
+ return r.identity
+}
diff --git a/internal/collector/common/transport/transport.go
b/internal/collector/common/transport/transport.go
index ee1da89..44058e3 100644
--- a/internal/collector/common/transport/transport.go
+++ b/internal/collector/common/transport/transport.go
@@ -45,8 +45,9 @@ const (
)
type Runner struct {
- Config *configtypes.CollectorConfig
- client transport.TransportClient
+ Config *configtypes.CollectorConfig
+ client transport.TransportClient
+ jobScheduler transport.JobScheduler
clrserver.Server
}
@@ -59,6 +60,11 @@ func New(cfg *configtypes.CollectorConfig) *Runner {
}
}
+// SetJobScheduler sets the job scheduler for the transport runner
+func (r *Runner) SetJobScheduler(scheduler transport.JobScheduler) {
+ r.jobScheduler = scheduler
+}
+
// NewFromConfig creates a new transport runner from collector configuration
func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner {
if cfg == nil {
@@ -150,7 +156,14 @@ func (r *Runner) Start(ctx context.Context) error {
r.Logger.Error(event.Error, "Failed to connect
to manager gRPC server", "addr", event.Address)
}
})
- transport.RegisterDefaultProcessors(c)
+ // Register processors with job scheduler
+ if r.jobScheduler != nil {
+ transport.RegisterDefaultProcessors(c, r.jobScheduler)
+ r.Logger.Info("Registered gRPC processors with job
scheduler")
+ } else {
+ transport.RegisterDefaultProcessors(c, nil)
+ r.Logger.Info("Registered gRPC processors without job
scheduler")
+ }
case *transport.NettyClient:
c.SetEventHandler(func(event transport.Event) {
switch event.Type {
@@ -163,7 +176,14 @@ func (r *Runner) Start(ctx context.Context) error {
r.Logger.Error(event.Error, "Failed to connect
to manager netty server", "addr", event.Address)
}
})
- transport.RegisterDefaultNettyProcessors(c)
+ // Register processors with job scheduler
+ if r.jobScheduler != nil {
+ transport.RegisterDefaultNettyProcessors(c,
r.jobScheduler)
+ r.Logger.Info("Registered netty processors with job
scheduler")
+ } else {
+ transport.RegisterDefaultNettyProcessors(c, nil)
+ r.Logger.Info("Registered netty processors without job
scheduler")
+ }
}
if err := r.client.Start(); err != nil {
diff --git a/internal/collector/common/types/job/job_types.go
b/internal/collector/common/types/job/job_types.go
index 28478b8..70c3c10 100644
--- a/internal/collector/common/types/job/job_types.go
+++ b/internal/collector/common/types/job/job_types.go
@@ -30,9 +30,9 @@ type Job struct {
Hide bool `json:"hide"`
Category string `json:"category"`
App string `json:"app"`
- Name map[string]string `json:"name"`
- Help map[string]string `json:"help"`
- HelpLink map[string]string `json:"helpLink"`
+ Name interface{} `json:"name"` // Can be
string or map[string]string for i18n
+ Help interface{} `json:"help"` // Can be
string or map[string]string for i18n
+ HelpLink interface{} `json:"helpLink"` // Can be
string or map[string]string for i18n
Timestamp int64 `json:"timestamp"`
DefaultInterval int64 `json:"defaultInterval"`
Intervals []int64 `json:"intervals"`
@@ -41,7 +41,10 @@ type Job struct {
Metrics []Metrics `json:"metrics"`
Configmap []Configmap `json:"configmap"`
IsSd bool `json:"isSd"`
+ Sd bool `json:"sd"`
PrometheusProxyMode bool `json:"prometheusProxyMode"`
+ Cyclic bool `json:"cyclic"`
+ Interval int64 `json:"interval"`
// Internal fields
EnvConfigmaps map[string]Configmap `json:"-"`
diff --git a/internal/collector/common/types/job/metrics_types.go
b/internal/collector/common/types/job/metrics_types.go
index 6127238..04f035e 100644
--- a/internal/collector/common/types/job/metrics_types.go
+++ b/internal/collector/common/types/job/metrics_types.go
@@ -17,29 +17,37 @@
package job
-import "time"
+import (
+ "fmt"
+ "time"
+)
// Metrics represents a metric configuration
type Metrics struct {
Name string `json:"name"`
+ I18n interface{} `json:"i18n,omitempty"` //
Internationalization info
Priority int `json:"priority"`
+ CollectTime int64 `json:"collectTime"`
+ Interval int64 `json:"interval"`
+ Visible *bool `json:"visible"`
Fields []Field `json:"fields"`
Aliasfields []string `json:"aliasfields"`
- Calculates []Calculate `json:"calculates"`
- Units []Unit `json:"units"`
+ AliasFields []string `json:"aliasFields"` // Alternative field
name
+ Calculates interface{} `json:"calculates"` // Can be
[]Calculate or []string
+ Filters interface{} `json:"filters"`
+ Units interface{} `json:"units"` // Can be []Unit or
[]string
Protocol string `json:"protocol"`
Host string `json:"host"`
Port string `json:"port"`
Timeout string `json:"timeout"`
- Interval int64 `json:"interval"`
Range string `json:"range"`
- Visible *bool `json:"visible"`
ConfigMap map[string]string `json:"configMap"`
+ HasSubTask bool `json:"hasSubTask"`
// Protocol specific fields
HTTP *HTTPProtocol `json:"http,omitempty"`
SSH *SSHProtocol `json:"ssh,omitempty"`
- JDBC *JDBCProtocol `json:"jdbc,omitempty"`
+ JDBC interface{} `json:"jdbc,omitempty"` // Can be JDBCProtocol
or map[string]interface{}
SNMP *SNMPProtocol `json:"snmp,omitempty"`
JMX *JMXProtocol `json:"jmx,omitempty"`
Redis *RedisProtocol `json:"redis,omitempty"`
@@ -54,6 +62,7 @@ type Field struct {
Unit string `json:"unit"`
Instance bool `json:"instance"`
Value interface{} `json:"value"`
+ I18n interface{} `json:"i18n,omitempty"` // Internationalization
info
}
// Calculate represents a calculation configuration
@@ -71,8 +80,10 @@ type Unit struct {
// ParamDefine represents a parameter definition
type ParamDefine struct {
+ ID interface{} `json:"id"`
+ App interface{} `json:"app"`
Field string `json:"field"`
- Name string `json:"name"`
+ Name interface{} `json:"name"` // Can be string or
map[string]string for i18n
Type string `json:"type"`
Required bool `json:"required"`
DefaultValue interface{} `json:"defaultValue"`
@@ -80,8 +91,38 @@ type ParamDefine struct {
Range string `json:"range"`
Limit int `json:"limit"`
Options []Option `json:"options"`
- Depend *Depend `json:"depend"`
+ KeyAlias interface{} `json:"keyAlias"`
+ ValueAlias interface{} `json:"valueAlias"`
Hide bool `json:"hide"`
+ Creator interface{} `json:"creator"`
+ Modifier interface{} `json:"modifier"`
+ GmtCreate interface{} `json:"gmtCreate"`
+ GmtUpdate interface{} `json:"gmtUpdate"`
+ Depend *Depend `json:"depend"`
+}
+
+// GetName returns the name as string, handling both string and i18n map cases
+func (p *ParamDefine) GetName() string {
+ switch v := p.Name.(type) {
+ case string:
+ return v
+ case map[string]interface{}:
+ // Try to get English name first, then any available language
+ if name, ok := v["en"]; ok {
+ if s, ok := name.(string); ok {
+ return s
+ }
+ }
+ // Fall back to first available name
+ for _, val := range v {
+ if s, ok := val.(string); ok {
+ return s
+ }
+ }
+ return "unknown"
+ default:
+ return fmt.Sprintf("%v", v)
+ }
}
// Option represents a parameter option
@@ -156,18 +197,18 @@ type SSHProtocol struct {
// JDBCProtocol represents JDBC protocol configuration
type JDBCProtocol struct {
- Host string `json:"host"`
- Port string `json:"port"`
- Platform string `json:"platform"`
- Username string `json:"username"`
- Password string `json:"password"`
- Database string `json:"database"`
- Timeout string `json:"timeout"`
- QueryType string `json:"queryType"`
- SQL string `json:"sql"`
- URL string `json:"url"`
- ReuseConnection string `json:"reuseConnection"`
- SSHTunnel *SSHTunnel `json:"sshTunnel,omitempty"`
+ Host string `json:"host"`
+ Port string `json:"port"`
+ Platform string `json:"platform"`
+ Username string `json:"username"`
+ Password string `json:"password"`
+ Database string `json:"database"`
+ Timeout string `json:"timeout"`
+ QueryType string `json:"queryType"`
+ SQL string `json:"sql"`
+ URL string `json:"url"`
+ ReuseConnection string `json:"reuseConnection"`
+ SSHTunnel map[string]interface{} `json:"sshTunnel,omitempty"`
}
// SNMPProtocol represents SNMP protocol configuration
@@ -270,7 +311,35 @@ func (j *Job) Clone() *Job {
if j.Metrics != nil {
clone.Metrics = make([]Metrics, len(j.Metrics))
- copy(clone.Metrics, j.Metrics)
+ for i, metric := range j.Metrics {
+ clone.Metrics[i] = metric
+
+ // Deep copy ConfigMap for each metric to avoid
concurrent access
+ if metric.ConfigMap != nil {
+ clone.Metrics[i].ConfigMap =
make(map[string]string, len(metric.ConfigMap))
+ for k, v := range metric.ConfigMap {
+ clone.Metrics[i].ConfigMap[k] = v
+ }
+ }
+
+ // Deep copy Fields slice
+ if metric.Fields != nil {
+ clone.Metrics[i].Fields = make([]Field,
len(metric.Fields))
+ copy(clone.Metrics[i].Fields, metric.Fields)
+ }
+
+ // Deep copy Aliasfields slice
+ if metric.Aliasfields != nil {
+ clone.Metrics[i].Aliasfields = make([]string,
len(metric.Aliasfields))
+ copy(clone.Metrics[i].Aliasfields,
metric.Aliasfields)
+ }
+
+ // Deep copy AliasFields slice
+ if metric.AliasFields != nil {
+ clone.Metrics[i].AliasFields = make([]string,
len(metric.AliasFields))
+ copy(clone.Metrics[i].AliasFields,
metric.AliasFields)
+ }
+ }
}
if j.Configmap != nil {
diff --git a/internal/transport/netty_client.go
b/internal/transport/netty_client.go
index d598149..ebaeade 100644
--- a/internal/transport/netty_client.go
+++ b/internal/transport/netty_client.go
@@ -503,7 +503,7 @@ func (f *TransportClientFactory) CreateClient(protocol,
addr string) (TransportC
}
// RegisterDefaultProcessors registers all default message processors for
Netty client
-func RegisterDefaultNettyProcessors(client *NettyClient) {
+func RegisterDefaultNettyProcessors(client *NettyClient, scheduler
JobScheduler) {
client.RegisterProcessor(MessageTypeHeartbeat, func(msg interface{})
(interface{}, error) {
if pbMsg, ok := msg.(*pb.Message); ok {
processor := &HeartbeatProcessor{}
@@ -538,7 +538,7 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg
interface{}) (interface{}, error) {
if pbMsg, ok := msg.(*pb.Message); ok {
- processor := &CollectCyclicDataProcessor{client: nil}
+ processor := &CollectCyclicDataProcessor{client: nil,
scheduler: scheduler}
return processor.Process(pbMsg)
}
return nil, fmt.Errorf("invalid message type")
@@ -546,7 +546,7 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg
interface{}) (interface{}, error) {
if pbMsg, ok := msg.(*pb.Message); ok {
- processor := &DeleteCyclicTaskProcessor{client: nil}
+ processor := &DeleteCyclicTaskProcessor{client: nil,
scheduler: scheduler}
return processor.Process(pbMsg)
}
return nil, fmt.Errorf("invalid message type")
@@ -554,7 +554,7 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
client.RegisterProcessor(MessageTypeIssueOneTimeTask, func(msg
interface{}) (interface{}, error) {
if pbMsg, ok := msg.(*pb.Message); ok {
- processor := &CollectOneTimeDataProcessor{client: nil}
+ processor := &CollectOneTimeDataProcessor{client: nil,
scheduler: scheduler}
return processor.Process(pbMsg)
}
return nil, fmt.Errorf("invalid message type")
diff --git a/internal/transport/processors.go b/internal/transport/processors.go
index 8bd71e3..b9df9a3 100644
--- a/internal/transport/processors.go
+++ b/internal/transport/processors.go
@@ -18,11 +18,18 @@
package transport
import (
+ "encoding/json"
"fmt"
"log"
+ "os"
"time"
pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
)
// Message type constants matching Java version
@@ -44,6 +51,23 @@ type MessageProcessor interface {
Process(msg *pb.Message) (*pb.Message, error)
}
+// JobScheduler defines the interface for job scheduling operations
+// This interface allows transport layer to interact with job scheduling
without direct dependencies
+type JobScheduler interface {
+ // AddAsyncCollectJob adds a job to async collection scheduling
+ AddAsyncCollectJob(job *jobtypes.Job) error
+
+ // RemoveAsyncCollectJob removes a job from scheduling
+ RemoveAsyncCollectJob(jobID int64) error
+}
+
+// preprocessJobPasswords is a helper function to decrypt passwords in job
configmap
+// This should be called once when receiving a job to avoid repeated decryption
+func preprocessJobPasswords(job *jobtypes.Job) error {
+ replacer := param.NewReplacer()
+ return replacer.PreprocessJobPasswords(job)
+}
+
// HeartbeatProcessor handles heartbeat messages
type HeartbeatProcessor struct{}
@@ -63,7 +87,31 @@ func NewGoOnlineProcessor(client *GrpcClient)
*GoOnlineProcessor {
}
func (p *GoOnlineProcessor) Process(msg *pb.Message) (*pb.Message, error) {
- // Handle go online message
+ log := logger.DefaultLogger(os.Stdout,
loggertype.LogLevelInfo).WithName("go-online-processor")
+
+ // Handle go online message - parse ServerInfo and extract AES secret
+ log.Info("received GO_ONLINE message from manager")
+
+ if len(msg.Msg) == 0 {
+ log.Info("empty message from server, please upgrade server")
+ } else {
+ // Parse ServerInfo from message (matches Java:
JsonUtil.fromJson(message.getMsg().toStringUtf8(), ServerInfo.class))
+ var serverInfo struct {
+ AesSecret string `json:"aesSecret"`
+ }
+
+ if err := json.Unmarshal(msg.Msg, &serverInfo); err != nil {
+ log.Error(err, "failed to parse ServerInfo from
manager")
+ } else if serverInfo.AesSecret == "" {
+ log.Info("server response has empty secret, please
check configuration")
+ } else {
+ // Set the AES secret key globally (matches Java:
AesUtil.setDefaultSecretKey(serverInfo.getAesSecret()))
+ log.Info("received AES secret from manager",
"keyLength", len(serverInfo.AesSecret))
+ crypto.SetDefaultSecretKey(serverInfo.AesSecret)
+ }
+ }
+
+ log.Info("online message processed successfully")
return &pb.Message{
Type: pb.MessageType_GO_ONLINE,
Direction: pb.Direction_RESPONSE,
@@ -128,65 +176,238 @@ func (p *GoCloseProcessor) Process(msg *pb.Message)
(*pb.Message, error) {
// CollectCyclicDataProcessor handles cyclic task messages
type CollectCyclicDataProcessor struct {
- client *GrpcClient
+ client *GrpcClient
+ scheduler JobScheduler
}
-func NewCollectCyclicDataProcessor(client *GrpcClient)
*CollectCyclicDataProcessor {
- return &CollectCyclicDataProcessor{client: client}
+func NewCollectCyclicDataProcessor(client *GrpcClient, scheduler JobScheduler)
*CollectCyclicDataProcessor {
+ return &CollectCyclicDataProcessor{
+ client: client,
+ scheduler: scheduler,
+ }
}
func (p *CollectCyclicDataProcessor) Process(msg *pb.Message) (*pb.Message,
error) {
- // Handle cyclic task message
- // TODO: Implement actual task processing logic
+ log := logger.DefaultLogger(os.Stdout,
loggertype.LogLevelInfo).WithName("cyclic-task-processor")
+ log.Info("processing cyclic task message", "identity", msg.Identity)
+
+ if p.scheduler == nil {
+ log.Info("no job scheduler available, cannot process cyclic
task")
+ return &pb.Message{
+ Type: pb.MessageType_ISSUE_CYCLIC_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: msg.Identity,
+ Msg: []byte("no job scheduler available"),
+ }, nil
+ }
+
+ // Parse job from message
+ log.Info("Received cyclic task JSON: %s", string(msg.Msg))
+
+ var job jobtypes.Job
+ if err := json.Unmarshal(msg.Msg, &job); err != nil {
+ log.Error(err, "Failed to unmarshal job from cyclic task
message: %v")
+ return &pb.Message{
+ Type: pb.MessageType_ISSUE_CYCLIC_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: msg.Identity,
+ Msg: []byte(fmt.Sprintf("failed to parse job:
%v", err)),
+ }, nil
+ }
+
+ // Preprocess passwords once (decrypt encrypted passwords in configmap)
+ // This avoids repeated decryption during each task execution
+ if err := preprocessJobPasswords(&job); err != nil {
+ log.Error(err, "Failed to preprocess job passwords")
+ }
+
+ // Ensure job is marked as cyclic
+ job.IsCyclic = true
+
+ log.Info("Adding cyclic job to scheduler",
+ "jobID", job.ID,
+ "monitorID", job.MonitorID,
+ "app", job.App)
+
+ // Add job to scheduler
+ if err := p.scheduler.AddAsyncCollectJob(&job); err != nil {
+ log.Error(err, "Failed to add cyclic job to scheduler: %v")
+ return &pb.Message{
+ Type: pb.MessageType_ISSUE_CYCLIC_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: msg.Identity,
+ Msg: []byte(fmt.Sprintf("failed to schedule job:
%v", err)),
+ }, nil
+ }
+
return &pb.Message{
Type: pb.MessageType_ISSUE_CYCLIC_TASK,
Direction: pb.Direction_RESPONSE,
Identity: msg.Identity,
- Msg: []byte("cyclic task ack"),
+ Msg: []byte("cyclic task scheduled successfully"),
}, nil
}
// DeleteCyclicTaskProcessor handles delete cyclic task messages
type DeleteCyclicTaskProcessor struct {
- client *GrpcClient
+ client *GrpcClient
+ scheduler JobScheduler
}
-func NewDeleteCyclicTaskProcessor(client *GrpcClient)
*DeleteCyclicTaskProcessor {
- return &DeleteCyclicTaskProcessor{client: client}
+func NewDeleteCyclicTaskProcessor(client *GrpcClient, scheduler JobScheduler)
*DeleteCyclicTaskProcessor {
+ return &DeleteCyclicTaskProcessor{
+ client: client,
+ scheduler: scheduler,
+ }
}
func (p *DeleteCyclicTaskProcessor) Process(msg *pb.Message) (*pb.Message,
error) {
- // Handle delete cyclic task message
+ log.Printf("Processing delete cyclic task message, identity: %s",
msg.Identity)
+
+ if p.scheduler == nil {
+ log.Printf("No job scheduler available, cannot process delete
cyclic task")
+ return &pb.Message{
+ Type: pb.MessageType_DELETE_CYCLIC_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: msg.Identity,
+ Msg: []byte("no job scheduler available"),
+ }, nil
+ }
+
+ // Parse job IDs from message - could be a single job ID or array of IDs
+ var jobIDs []int64
+
+ // Try to parse as array first
+ if err := json.Unmarshal(msg.Msg, &jobIDs); err != nil {
+ // If array parsing fails, try single job ID
+ var singleJobID int64
+ if err2 := json.Unmarshal(msg.Msg, &singleJobID); err2 != nil {
+ log.Printf("Failed to unmarshal job IDs from delete
message: %v, %v", err, err2)
+ return &pb.Message{
+ Type: pb.MessageType_DELETE_CYCLIC_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: msg.Identity,
+ Msg: []byte(fmt.Sprintf("failed to parse
job IDs: %v", err)),
+ }, nil
+ }
+ jobIDs = []int64{singleJobID}
+ }
+
+ log.Printf("Removing %d cyclic jobs from scheduler: %v", len(jobIDs),
jobIDs)
+
+ // Remove jobs from scheduler
+ var errors []string
+ for _, jobID := range jobIDs {
+ if err := p.scheduler.RemoveAsyncCollectJob(jobID); err != nil {
+ log.Printf("Failed to remove job %d from scheduler:
%v", jobID, err)
+ errors = append(errors, fmt.Sprintf("job %d: %v",
jobID, err))
+ } else {
+ log.Printf("Successfully removed job %d from
scheduler", jobID)
+ }
+ }
+
+ // Prepare response message
+ var responseMsg string
+ if len(errors) > 0 {
+ responseMsg = fmt.Sprintf("partially completed, errors: %v",
errors)
+ } else {
+ responseMsg = "all cyclic tasks removed successfully"
+ }
+
return &pb.Message{
Type: pb.MessageType_DELETE_CYCLIC_TASK,
Direction: pb.Direction_RESPONSE,
Identity: msg.Identity,
- Msg: []byte("delete cyclic task ack"),
+ Msg: []byte(responseMsg),
}, nil
}
// CollectOneTimeDataProcessor handles one-time task messages
type CollectOneTimeDataProcessor struct {
- client *GrpcClient
+ client *GrpcClient
+ scheduler JobScheduler
}
-func NewCollectOneTimeDataProcessor(client *GrpcClient)
*CollectOneTimeDataProcessor {
- return &CollectOneTimeDataProcessor{client: client}
+func NewCollectOneTimeDataProcessor(client *GrpcClient, scheduler
JobScheduler) *CollectOneTimeDataProcessor {
+ return &CollectOneTimeDataProcessor{
+ client: client,
+ scheduler: scheduler,
+ }
}
func (p *CollectOneTimeDataProcessor) Process(msg *pb.Message) (*pb.Message,
error) {
- // Handle one-time task message
- // TODO: Implement actual task processing logic
+ log.Printf("Processing one-time task message, identity: %s",
msg.Identity)
+
+ if p.scheduler == nil {
+ log.Printf("No job scheduler available, cannot process one-time
task")
+ return &pb.Message{
+ Type: pb.MessageType_ISSUE_ONE_TIME_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: msg.Identity,
+ Msg: []byte("no job scheduler available"),
+ }, nil
+ }
+
+ // Parse job from message
+ log.Printf("Received one-time task JSON: %s", string(msg.Msg))
+
+ // Parse JSON to check interval values before unmarshaling
+ var rawJob map[string]interface{}
+ if err := json.Unmarshal(msg.Msg, &rawJob); err == nil {
+ if defaultInterval, ok := rawJob["defaultInterval"]; ok {
+ log.Printf("DEBUG: Raw defaultInterval from JSON: %v
(type: %T)", defaultInterval, defaultInterval)
+ }
+ if interval, ok := rawJob["interval"]; ok {
+ log.Printf("DEBUG: Raw interval from JSON: %v (type:
%T)", interval, interval)
+ }
+ }
+
+ var job jobtypes.Job
+ if err := json.Unmarshal(msg.Msg, &job); err != nil {
+ log.Printf("Failed to unmarshal job from one-time task message:
%v", err)
+ log.Printf("JSON content: %s", string(msg.Msg))
+ return &pb.Message{
+ Type: pb.MessageType_ISSUE_ONE_TIME_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: msg.Identity,
+ Msg: []byte(fmt.Sprintf("failed to parse job:
%v", err)),
+ }, nil
+ }
+
+ // Preprocess passwords once (decrypt encrypted passwords in configmap)
+ // This avoids repeated decryption during each task execution
+ if err := preprocessJobPasswords(&job); err != nil {
+ log.Printf("Failed to preprocess job passwords: %v", err)
+ }
+
+ // Ensure job is marked as non-cyclic
+ job.IsCyclic = false
+
+ log.Printf("Adding one-time job to scheduler: jobID=%d, monitorID=%d,
app=%s",
+ job.ID, job.MonitorID, job.App)
+
+ // Add job to scheduler
+ if err := p.scheduler.AddAsyncCollectJob(&job); err != nil {
+ log.Printf("Failed to add one-time job to scheduler: %v", err)
+ return &pb.Message{
+ Type: pb.MessageType_ISSUE_ONE_TIME_TASK,
+ Direction: pb.Direction_RESPONSE,
+ Identity: msg.Identity,
+ Msg: []byte(fmt.Sprintf("failed to schedule job:
%v", err)),
+ }, nil
+ }
+
+ log.Printf("Successfully scheduled one-time job: jobID=%d", job.ID)
return &pb.Message{
Type: pb.MessageType_ISSUE_ONE_TIME_TASK,
Direction: pb.Direction_RESPONSE,
Identity: msg.Identity,
- Msg: []byte("one-time task ack"),
+ Msg: []byte("one-time task scheduled successfully"),
}, nil
}
// RegisterDefaultProcessors registers all default message processors
-func RegisterDefaultProcessors(client *GrpcClient) {
+func RegisterDefaultProcessors(client *GrpcClient, scheduler JobScheduler) {
client.RegisterProcessor(MessageTypeHeartbeat, func(msg interface{})
(interface{}, error) {
if pbMsg, ok := msg.(*pb.Message); ok {
processor := &HeartbeatProcessor{}
@@ -221,7 +442,7 @@ func RegisterDefaultProcessors(client *GrpcClient) {
client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg
interface{}) (interface{}, error) {
if pbMsg, ok := msg.(*pb.Message); ok {
- processor := NewCollectCyclicDataProcessor(client)
+ processor := NewCollectCyclicDataProcessor(client,
scheduler)
return processor.Process(pbMsg)
}
return nil, fmt.Errorf("invalid message type")
@@ -229,7 +450,7 @@ func RegisterDefaultProcessors(client *GrpcClient) {
client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg
interface{}) (interface{}, error) {
if pbMsg, ok := msg.(*pb.Message); ok {
- processor := NewDeleteCyclicTaskProcessor(client)
+ processor := NewDeleteCyclicTaskProcessor(client,
scheduler)
return processor.Process(pbMsg)
}
return nil, fmt.Errorf("invalid message type")
@@ -237,7 +458,7 @@ func RegisterDefaultProcessors(client *GrpcClient) {
client.RegisterProcessor(MessageTypeIssueOneTimeTask, func(msg
interface{}) (interface{}, error) {
if pbMsg, ok := msg.(*pb.Message); ok {
- processor := NewCollectOneTimeDataProcessor(client)
+ processor := NewCollectOneTimeDataProcessor(client,
scheduler)
return processor.Process(pbMsg)
}
return nil, fmt.Errorf("invalid message type")
diff --git a/internal/util/crypto/aes_util.go b/internal/util/crypto/aes_util.go
new file mode 100644
index 0000000..678760f
--- /dev/null
+++ b/internal/util/crypto/aes_util.go
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package crypto
+
+import (
+ "crypto/aes"
+ "crypto/cipher"
+ "encoding/base64"
+ "fmt"
+ "os"
+ "sync"
+ "unicode"
+
+ loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// AESUtil provides AES encryption/decryption utilities
+// This matches Java's AesUtil functionality
+type AESUtil struct {
+ secretKey string
+ mutex sync.RWMutex
+}
+
+var (
+ defaultAESUtil = &AESUtil{}
+ once sync.Once
+ log = logger.DefaultLogger(os.Stdout,
loggertype.LogLevelInfo).WithName("aes-util")
+)
+
+// GetDefaultAESUtil returns the singleton AES utility instance
+func GetDefaultAESUtil() *AESUtil {
+ once.Do(func() {
+ defaultAESUtil = &AESUtil{}
+ })
+ return defaultAESUtil
+}
+
+// SetDefaultSecretKey sets the default AES secret key (matches Java:
AesUtil.setDefaultSecretKey)
+func SetDefaultSecretKey(secretKey string) {
+ GetDefaultAESUtil().SetSecretKey(secretKey)
+}
+
+// GetDefaultSecretKey gets the current AES secret key (matches Java:
AesUtil.getDefaultSecretKey)
+func GetDefaultSecretKey() string {
+ return GetDefaultAESUtil().GetSecretKey()
+}
+
+// SetSecretKey sets the AES secret key for this instance
+func (a *AESUtil) SetSecretKey(secretKey string) {
+ a.mutex.Lock()
+ defer a.mutex.Unlock()
+ a.secretKey = secretKey
+ log.Info("AES secret key updated", "keyLength", len(secretKey))
+}
+
+// GetSecretKey gets the AES secret key for this instance
+func (a *AESUtil) GetSecretKey() string {
+ a.mutex.RLock()
+ defer a.mutex.RUnlock()
+ return a.secretKey
+}
+
+// AesDecode decrypts AES encrypted data (matches Java: AesUtil.aesDecode)
+func (a *AESUtil) AesDecode(encryptedData string) (string, error) {
+ secretKey := a.GetSecretKey()
+ if secretKey == "" {
+ return "", fmt.Errorf("AES secret key not set")
+ }
+ return a.AesDecodeWithKey(encryptedData, secretKey)
+}
+
+// AesDecodeWithKey decrypts AES encrypted data with specified key
+func (a *AESUtil) AesDecodeWithKey(encryptedData, key string) (string, error) {
+ // Ensure key is exactly 16 bytes for AES-128 (Java requirement)
+ keyBytes := []byte(key)
+ if len(keyBytes) != 16 {
+ return "", fmt.Errorf("key must be exactly 16 bytes, got %d",
len(keyBytes))
+ }
+
+ // Decode base64 encoded data
+ encryptedBytes, err := base64.StdEncoding.DecodeString(encryptedData)
+ if err != nil {
+ return "", fmt.Errorf("failed to decode base64: %w", err)
+ }
+
+ // Create AES cipher
+ block, err := aes.NewCipher(keyBytes)
+ if err != nil {
+ return "", fmt.Errorf("failed to create AES cipher: %w", err)
+ }
+
+ // Check if data length is valid for AES
+ if len(encryptedBytes) < aes.BlockSize ||
len(encryptedBytes)%aes.BlockSize != 0 {
+ return "", fmt.Errorf("invalid encrypted data length: %d",
len(encryptedBytes))
+ }
+
+ // Java uses the key itself as IV (this matches Java's implementation)
+ iv := keyBytes
+
+ // Create CBC decrypter
+ mode := cipher.NewCBCDecrypter(block, iv)
+
+ // Decrypt
+ decrypted := make([]byte, len(encryptedBytes))
+ mode.CryptBlocks(decrypted, encryptedBytes)
+
+ // Remove PKCS5/PKCS7 padding (Go's PKCS7 is compatible with Java's
PKCS5 for AES)
+ decrypted, err = removePKCS7Padding(decrypted)
+ if err != nil {
+ return "", fmt.Errorf("failed to remove padding: %w", err)
+ }
+
+ return string(decrypted), nil
+}
+
+// IsCiphertext determines whether text is encrypted (matches Java:
AesUtil.isCiphertext)
+func (a *AESUtil) IsCiphertext(text string) bool {
+ // First check if it's valid base64
+ if _, err := base64.StdEncoding.DecodeString(text); err != nil {
+ return false
+ }
+
+ // Try to decrypt and see if it succeeds
+ _, err := a.AesDecode(text)
+ return err == nil
+}
+
+// removePKCS7Padding removes PKCS7 padding from decrypted data
+func removePKCS7Padding(data []byte) ([]byte, error) {
+ if len(data) == 0 {
+ return data, nil
+ }
+
+ paddingLen := int(data[len(data)-1])
+ if paddingLen > len(data) || paddingLen == 0 {
+ return data, nil // Return as-is if padding seems invalid
+ }
+
+ // Verify padding
+ for i := 0; i < paddingLen; i++ {
+ if data[len(data)-1-i] != byte(paddingLen) {
+ return data, nil // Return as-is if padding is invalid
+ }
+ }
+
+ return data[:len(data)-paddingLen], nil
+}
+
+// isPrintableString checks if a string contains only printable characters
+func isPrintableString(s string) bool {
+ if len(s) == 0 {
+ return false
+ }
+
+ for _, r := range s {
+ if !unicode.IsPrint(r) && !unicode.IsSpace(r) {
+ return false
+ }
+ }
+ return true
+}
+
+// Convenience functions that match Java's static methods
+
+// AesDecode decrypts with the default secret key
+func AesDecode(encryptedData string) (string, error) {
+ return GetDefaultAESUtil().AesDecode(encryptedData)
+}
+
+// AesDecodeWithKey decrypts with specified key
+func AesDecodeWithKey(encryptedData, key string) (string, error) {
+ return GetDefaultAESUtil().AesDecodeWithKey(encryptedData, key)
+}
+
+// IsCiphertext checks if text is encrypted using default key
+func IsCiphertext(text string) bool {
+ return GetDefaultAESUtil().IsCiphertext(text)
+}
diff --git a/internal/util/param/param_replacer.go
b/internal/util/param/param_replacer.go
new file mode 100644
index 0000000..ff6a944
--- /dev/null
+++ b/internal/util/param/param_replacer.go
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package param
+
+import (
+ "encoding/json"
+ "fmt"
+ "os"
+ "strconv"
+ "strings"
+
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// Replacer is a standalone utility for parameter replacement
+// This matches Java's parameter substitution mechanism where ^_^paramName^_^
gets replaced with actual values
+type Replacer struct{}
+
+// NewReplacer creates a new parameter replacer
+func NewReplacer() *Replacer {
+ return &Replacer{}
+}
+
+// PreprocessJobPasswords decrypts passwords in job configmap once during job
creation
+// This permanently replaces encrypted passwords with decrypted ones in the
job configmap
+func (r *Replacer) PreprocessJobPasswords(job *jobtypes.Job) error {
+ if job == nil || job.Configmap == nil {
+ return nil
+ }
+
+ log := logger.DefaultLogger(os.Stdout,
loggertype.LogLevelDebug).WithName("password-preprocessor")
+
+ // Decrypt passwords in configmap once and permanently replace them
+ for i := range job.Configmap {
+ config := &job.Configmap[i] // Get pointer to modify in place
+ if config.Type == 2 { // password type
+ if encryptedValue, ok := config.Value.(string); ok {
+ log.Sugar().Debugf("preprocessing encrypted
password for key: %s", config.Key)
+ if decoded, err :=
r.decryptPassword(encryptedValue); err == nil {
+ log.Info("password preprocessing
successful", "key", config.Key, "length", len(decoded))
+ config.Value = decoded // Permanently
replace encrypted value with decrypted value
+ config.Type = 1 // Change type
to string since it's now decrypted
+ } else {
+ log.Error(err, "password preprocessing
failed, keeping original value", "key", config.Key)
+ }
+ }
+ }
+ }
+ return nil
+}
+
+// ReplaceJobParams replaces all parameter placeholders in job configuration
+func (r *Replacer) ReplaceJobParams(job *jobtypes.Job) (*jobtypes.Job, error) {
+ if job == nil {
+ return nil, fmt.Errorf("job is nil")
+ }
+
+ // Create parameter map from configmap (passwords should already be
decrypted)
+ paramMap := r.createParamMapSimple(job.Configmap)
+
+ // Clone the job to avoid modifying original
+ clonedJob := job.Clone()
+ if clonedJob == nil {
+ return nil, fmt.Errorf("failed to clone job")
+ }
+
+ // Replace parameters in all metrics
+ for i := range clonedJob.Metrics {
+ if err := r.replaceMetricsParams(&clonedJob.Metrics[i],
paramMap); err != nil {
+ return nil, fmt.Errorf("failed to replace params in
metrics %s: %w", clonedJob.Metrics[i].Name, err)
+ }
+ }
+
+ return clonedJob, nil
+}
+
+// createParamMapSimple creates a parameter map from configmap entries
+// Assumes passwords have already been decrypted by PreprocessJobPasswords
+func (r *Replacer) createParamMapSimple(configmap []jobtypes.Configmap)
map[string]string {
+ paramMap := make(map[string]string)
+
+ for _, config := range configmap {
+ // Convert value to string, handle null values as empty strings
+ var strValue string
+ if config.Value == nil {
+ strValue = "" // null values become empty strings
+ } else {
+ switch config.Type {
+ case 0: // number
+ if numVal, ok := config.Value.(float64); ok {
+ strValue = strconv.FormatFloat(numVal,
'f', -1, 64)
+ } else if intVal, ok := config.Value.(int); ok {
+ strValue = strconv.Itoa(intVal)
+ } else {
+ strValue = fmt.Sprintf("%v",
config.Value)
+ }
+ case 1, 2: // string or password (both are now plain
strings after preprocessing)
+ strValue = fmt.Sprintf("%v", config.Value)
+ default:
+ strValue = fmt.Sprintf("%v", config.Value)
+ }
+ }
+ paramMap[config.Key] = strValue
+ }
+
+ return paramMap
+}
+
+// createParamMap creates a parameter map from configmap entries (legacy
version with decryption)
+// Deprecated: Use PreprocessJobPasswords + createParamMapSafe instead
+func (r *Replacer) createParamMap(configmap []jobtypes.Configmap)
map[string]string {
+ paramMap := make(map[string]string)
+
+ for _, config := range configmap {
+ // Convert value to string based on type, handle null values as
empty strings
+ var strValue string
+ if config.Value == nil {
+ strValue = "" // null values become empty strings
+ } else {
+ switch config.Type {
+ case 0: // number
+ if numVal, ok := config.Value.(float64); ok {
+ strValue = strconv.FormatFloat(numVal,
'f', -1, 64)
+ } else if intVal, ok := config.Value.(int); ok {
+ strValue = strconv.Itoa(intVal)
+ } else {
+ strValue = fmt.Sprintf("%v",
config.Value)
+ }
+ case 1: // string
+ strValue = fmt.Sprintf("%v", config.Value)
+ case 2: // password (encrypted)
+ if encryptedValue, ok := config.Value.(string);
ok {
+ log := logger.DefaultLogger(os.Stdout,
loggertype.LogLevelDebug).WithName("param-replacer")
+ log.Sugar().Debugf("attempting to
decrypt password: %s", encryptedValue)
+ if decoded, err :=
r.decryptPassword(encryptedValue); err == nil {
+ log.Info("password decryption
successful", "length", len(decoded))
+ strValue = decoded
+ } else {
+ log.Error(err, "password
decryption failed, using original value")
+ // Fallback to original value
if decryption fails
+ strValue = encryptedValue
+ }
+ } else {
+ strValue = fmt.Sprintf("%v",
config.Value)
+ }
+ default:
+ strValue = fmt.Sprintf("%v", config.Value)
+ }
+ }
+ paramMap[config.Key] = strValue
+ }
+
+ return paramMap
+}
+
+// replaceMetricsParams replaces parameters in metrics configuration
+func (r *Replacer) replaceMetricsParams(metrics *jobtypes.Metrics, paramMap
map[string]string) error {
+ // Replace parameters in protocol-specific configurations
+ // Currently only JDBC is defined as interface{} in the struct
definition
+ // Other protocols are concrete struct pointers and don't contain
placeholders from JSON
+ if metrics.JDBC != nil {
+ if err := r.replaceProtocolParams(&metrics.JDBC, paramMap); err
!= nil {
+ return fmt.Errorf("failed to replace JDBC params: %w",
err)
+ }
+ }
+
+ // TODO: Add other protocol configurations as they are converted to
interface{} types
+ // For now, other protocols (HTTP, SSH, etc.) are concrete struct
pointers and
+ // don't require parameter replacement
+
+ // Replace parameters in basic metrics fields
+ if err := r.replaceBasicMetricsParams(metrics, paramMap); err != nil {
+ return fmt.Errorf("failed to replace basic metrics params: %w",
err)
+ }
+
+ return nil
+}
+
+// replaceProtocolParams replaces parameters in any protocol configuration
+func (r *Replacer) replaceProtocolParams(protocolInterface *interface{},
paramMap map[string]string) error {
+ if *protocolInterface == nil {
+ return nil
+ }
+
+ // Convert protocol interface{} to map for manipulation
+ protocolMap, ok := (*protocolInterface).(map[string]interface{})
+ if !ok {
+ // If it's already processed or not a map, skip
+ return nil
+ }
+
+ // Recursively replace parameters in all string values
+ return r.replaceParamsInMap(protocolMap, paramMap)
+}
+
+// replaceParamsInMap recursively replaces parameters in a map structure
+func (r *Replacer) replaceParamsInMap(data map[string]interface{}, paramMap
map[string]string) error {
+ for key, value := range data {
+ switch v := value.(type) {
+ case string:
+ // Replace parameters in string values
+ data[key] = r.replaceParamPlaceholders(v, paramMap)
+ case map[string]interface{}:
+ // Recursively handle nested maps
+ if err := r.replaceParamsInMap(v, paramMap); err != nil
{
+ return fmt.Errorf("failed to replace params in
nested map %s: %w", key, err)
+ }
+ case []interface{}:
+ // Handle arrays
+ for i, item := range v {
+ if itemMap, ok :=
item.(map[string]interface{}); ok {
+ if err := r.replaceParamsInMap(itemMap,
paramMap); err != nil {
+ return fmt.Errorf("failed to
replace params in array item %d: %w", i, err)
+ }
+ } else if itemStr, ok := item.(string); ok {
+ v[i] =
r.replaceParamPlaceholders(itemStr, paramMap)
+ }
+ }
+ }
+ }
+ return nil
+}
+
+// replaceBasicMetricsParams replaces parameters in basic metrics fields
+func (r *Replacer) replaceBasicMetricsParams(metrics *jobtypes.Metrics,
paramMap map[string]string) error {
+ // Replace parameters in basic string fields
+ metrics.Host = r.replaceParamPlaceholders(metrics.Host, paramMap)
+ metrics.Port = r.replaceParamPlaceholders(metrics.Port, paramMap)
+ metrics.Timeout = r.replaceParamPlaceholders(metrics.Timeout, paramMap)
+ metrics.Range = r.replaceParamPlaceholders(metrics.Range, paramMap)
+
+ // Replace parameters in ConfigMap
+ if metrics.ConfigMap != nil {
+ for key, value := range metrics.ConfigMap {
+ metrics.ConfigMap[key] =
r.replaceParamPlaceholders(value, paramMap)
+ }
+ }
+
+ return nil
+}
+
+// replaceParamPlaceholders replaces ^_^paramName^_^ placeholders with actual
values
+func (r *Replacer) replaceParamPlaceholders(template string, paramMap
map[string]string) string {
+ // Guard against empty templates to prevent strings.ReplaceAll issues
+ if template == "" {
+ return ""
+ }
+
+ result := template
+
+ // Find all ^_^paramName^_^ patterns and replace them
+ for paramName, paramValue := range paramMap {
+ // Guard against empty parameter names
+ if paramName == "" {
+ continue
+ }
+ placeholder := fmt.Sprintf("^_^%s^_^", paramName)
+ result = strings.ReplaceAll(result, placeholder, paramValue)
+ }
+
+ return result
+}
+
+// ExtractProtocolConfig extracts and processes protocol configuration after
parameter replacement
+func (r *Replacer) ExtractProtocolConfig(protocolInterface interface{},
targetStruct interface{}) error {
+ if protocolInterface == nil {
+ return fmt.Errorf("protocol interface is nil")
+ }
+
+ // If it's a map (from JSON parsing), convert to target struct
+ if protocolMap, ok := protocolInterface.(map[string]interface{}); ok {
+ // Convert map to JSON and then unmarshal to target struct
+ jsonData, err := json.Marshal(protocolMap)
+ if err != nil {
+ return fmt.Errorf("failed to marshal protocol config:
%w", err)
+ }
+
+ if err := json.Unmarshal(jsonData, targetStruct); err != nil {
+ return fmt.Errorf("failed to unmarshal protocol config:
%w", err)
+ }
+
+ return nil
+ }
+
+ return fmt.Errorf("unsupported protocol config type: %T",
protocolInterface)
+}
+
+// ExtractJDBCConfig extracts and processes JDBC configuration after parameter
replacement
+func (r *Replacer) ExtractJDBCConfig(jdbcInterface interface{})
(*jobtypes.JDBCProtocol, error) {
+ if jdbcInterface == nil {
+ return nil, nil
+ }
+
+ // If it's already a JDBCProtocol struct
+ if jdbcConfig, ok := jdbcInterface.(*jobtypes.JDBCProtocol); ok {
+ return jdbcConfig, nil
+ }
+
+ // Use the generic extraction method
+ var jdbcConfig jobtypes.JDBCProtocol
+ if err := r.ExtractProtocolConfig(jdbcInterface, &jdbcConfig); err !=
nil {
+ return nil, fmt.Errorf("failed to extract JDBC config: %w", err)
+ }
+
+ return &jdbcConfig, nil
+}
+
+// ExtractHTTPConfig extracts and processes HTTP configuration after parameter
replacement
+func (r *Replacer) ExtractHTTPConfig(httpInterface interface{})
(*jobtypes.HTTPProtocol, error) {
+ if httpInterface == nil {
+ return nil, nil
+ }
+
+ // If it's already an HTTPProtocol struct
+ if httpConfig, ok := httpInterface.(*jobtypes.HTTPProtocol); ok {
+ return httpConfig, nil
+ }
+
+ // Use the generic extraction method
+ var httpConfig jobtypes.HTTPProtocol
+ if err := r.ExtractProtocolConfig(httpInterface, &httpConfig); err !=
nil {
+ return nil, fmt.Errorf("failed to extract HTTP config: %w", err)
+ }
+
+ return &httpConfig, nil
+}
+
+// ExtractSSHConfig extracts and processes SSH configuration after parameter
replacement
+func (r *Replacer) ExtractSSHConfig(sshInterface interface{})
(*jobtypes.SSHProtocol, error) {
+ if sshInterface == nil {
+ return nil, nil
+ }
+
+ // If it's already an SSHProtocol struct
+ if sshConfig, ok := sshInterface.(*jobtypes.SSHProtocol); ok {
+ return sshConfig, nil
+ }
+
+ // Use the generic extraction method
+ var sshConfig jobtypes.SSHProtocol
+ if err := r.ExtractProtocolConfig(sshInterface, &sshConfig); err != nil
{
+ return nil, fmt.Errorf("failed to extract SSH config: %w", err)
+ }
+
+ return &sshConfig, nil
+}
+
+// decryptPassword decrypts an encrypted password using AES
+// This implements the same algorithm as Java manager's AESUtil
+func (r *Replacer) decryptPassword(encryptedPassword string) (string, error) {
+ log := logger.DefaultLogger(os.Stdout,
loggertype.LogLevelDebug).WithName("password-decrypt")
+
+ // Use the AES utility (matches Java: AesUtil.aesDecode)
+ if result, err := crypto.AesDecode(encryptedPassword); err == nil {
+ log.Info("password decrypted successfully", "length",
len(result))
+ return result, nil
+ } else {
+ log.Sugar().Debugf("primary decryption failed: %v", err)
+ }
+
+ // Fallback: try with default Java key if no manager key is set
+ defaultKey := "tomSun28HaHaHaHa"
+ if result, err := crypto.AesDecodeWithKey(encryptedPassword,
defaultKey); err == nil {
+ log.Info("password decrypted with default key", "length",
len(result))
+ return result, nil
+ }
+
+ // If all decryption attempts fail, return original value to allow
system to continue
+ log.Info("all decryption attempts failed, using original value")
+ return encryptedPassword, nil
+}
diff --git a/tools/docker/hcg/Dockerfile b/tools/docker/hcg/Dockerfile
deleted file mode 100644
index 2bb553d..0000000
--- a/tools/docker/hcg/Dockerfile
+++ /dev/null
@@ -1,35 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-FROM
docker.io/library/busybox@sha256:ab33eacc8251e3807b85bb6dba570e4698c3998eca6f0fc2ccb60575a563ea74
AS builder
-
-# prepare hertzbeat data dir
-RUN mkdir -p /var/hertzbeat
-
-# Use distroless as minimal base image to package the manager binary
-# Refer to https://github.com/GoogleContainerTools/distroless for more details
-FROM
gcr.io/distroless/base-nossl:nonroot@sha256:8981b63f968e829d21351ea9d28cc21127e5f034707f1d8483d2993d9577be0b
-
-COPY --from=builder /var/hertzbeat/ /var/hertzbeat/
-
-# copy binary to image, run make build to generate binary.
-COPY bin /usr/local/bin/
-COPY etc /var/hertzbeat/config/
-
-USER 65532:65532
-
-ENTRYPOINT ["/usr/local/bin/collector", "server", "--config",
"/var/hertzbeat/config/hertzbeat-collector.yaml"]
diff --git a/tools/make/golang.mk b/tools/make/golang.mk
index a67ec75..0fe356f 100644
--- a/tools/make/golang.mk
+++ b/tools/make/golang.mk
@@ -18,46 +18,37 @@
VERSION_PACKAGE :=
hertzbeat.apache.org/hertzbeat-collector-go/internal/cmd/version
GO_LDFLAGS += -X $(VERSION_PACKAGE).hcgVersion=$(shell cat VERSION) \
--X $(VERSION_PACKAGE).gitCommitID=$(GIT_COMMIT)
+ -X $(VERSION_PACKAGE).gitCommitID=$(GIT_COMMIT)
GIT_COMMIT:=$(shell git rev-parse HEAD)
-GOOS ?= $(shell go env GOOS)
-GOARCH ?= $(shell go env GOARCH)
-
##@ Golang
.PHONY: fmt
fmt: ## Golang fmt
- @$(LOG_TARGET)
go fmt ./...
.PHONY: vet
vet: ## Golang vet
- @$(LOG_TARGET)
go vet ./...
.PHONY: dev
dev: ## Golang dev, run main by run.
- @$(LOG_TARGET)
go run cmd/main.go server --config etc/hertzbeat-collector.yaml
.PHONY: prod
prod: ## Golang prod, run bin by run.
- @$(LOG_TARGET)
bin/collector server --config etc/hertzbeat-collector.yaml
.PHONY: build
-build: GOOS = linux
+# build
build: ## Golang build
- @$(LOG_TARGET)
@version=$$(cat VERSION); \
# todo; 添加交叉编译支持
- CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o bin/collector
-ldflags "$(GO_LDFLAGS)" cmd/main.go
+ CGO_ENABLED=0 go build -o bin/collector -ldflags "$(GO_LDFLAGS)"
cmd/main.go
.PHONY: init
init: ## install base. For proto compile.
- @$(LOG_TARGET)
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
go install github.com/go-kratos/kratos/cmd/protoc-gen-go-http/v2@latest
@@ -67,7 +58,6 @@ init: ## install base. For proto compile.
# generate api proto
api: API_PROTO_FILES := $(wildcard api/*.proto)
api: ## compile api proto files
- @$(LOG_TARGET)
protoc --proto_path=./api \
--go_out=paths=source_relative:./api \
--go-http_out=paths=source_relative:./api \
@@ -76,10 +66,12 @@ api: ## compile api proto files
.PHONY: go-lint
go-lint: ## run golang lint
- @$(LOG_TARGET)
golangci-lint run --config tools/linter/golang-ci/.golangci.yml
.PHONY: test
test: ## run golang test
- @$(LOG_TARGET)
go test -v ./...
+
+.PHONY: golang-all
+golang-all: ## run fmt lint vet build api test
+golang-all: fmt go-lint vet build api test
diff --git a/tools/make/image.mk b/tools/make/image.mk
deleted file mode 100644
index fc37001..0000000
--- a/tools/make/image.mk
+++ /dev/null
@@ -1,58 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# This is a wrapper to build and push docker image
-#
-
-# All make targets related to docker image are defined in this file.
-
-REGISTRY ?= docker.io
-
-TAG ?= $(shell git rev-parse HEAD)
-
-DOCKER := docker
-DOCKER_SUPPORTED_API_VERSION ?= 1.32
-
-IMAGES_DIR ?= $(wildcard tools/docker/hcg)
-
-IMAGES ?= hertzbeat-collector-go
-IMAGE_PLATFORMS ?= amd64 arm64
-
-BUILDX_CONTEXT = hcg-build-tools-builder
-
-##@ Image
-
-# todo: multi-platform build
-
-.PHONY: image-build
-image-build: ## Build docker image
-image-build: IMAGE_PLATFORMS = ${shell uname -m}
-image-build:
- @$(LOG_TARGET)
- make build
- $(DOCKER) buildx create --name $(BUILDX_CONTEXT) --use; \
- $(DOCKER) buildx use $(BUILDX_CONTEXT); \
- $(DOCKER) buildx build --load \
- -t $(REGISTRY)/${IMAGES}:$(TAG) \
- --platform linux/${IMAGE_PLATFORMS} \
- --file $(IMAGES_DIR)/Dockerfile . ; \
- $(DOCKER) buildx rm $(BUILDX_CONTEXT)
-
-.PHONY: image-push
-image-push: ## Push docker image
-image-push:
- @$(LOG_TARGET)
- $(DOCKER) push $(REGISTRY)/$${image}:$(TAG)-$${platform}; \
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]