This is an automated email from the ASF dual-hosted git repository.

liuhongyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c1de38  feat(core): transport scheduler integration (#19)
2c1de38 is described below

commit 2c1de38d6a5cb615473aace7485b8ac38289421e
Author: 铁甲小宝 <[email protected]>
AuthorDate: Sat Sep 27 19:21:06 2025 +0800

    feat(core): transport scheduler integration (#19)
    
    * add:The first version of the modified schedule
    
    * 1、fix the scheduling accuracy of critical time wheels and the 
rescheduling of cyclic tasks
    2. Supports JavA-compatible AES encryption and dynamic key management
    3. Solve the problem of concurrent map access through one-time password 
preprocessing
    4. Add comprehensive operation statistics and monitoring functions
    5. manager message reception
    
    * modified:
    1、Use the key when deleting the test.
    2、Reset the order of magnitude of the request time.
---
 Dockerfile                                         |  63 ++++
 Makefile                                           |  11 +-
 .../docker-compose.yml => docker-compose.yml       |   4 +-
 internal/cmd/server.go                             |  19 +-
 .../collector/basic/database/jdbc_collector.go     | 105 +++---
 .../common/collect/dispatch/metrics_collector.go   |  10 +-
 .../collector/common/collect/result_handler.go     |   9 +-
 .../common/collect/strategy/strategy_factory.go    |   9 -
 .../common/dispatcher/common_dispatcher.go         |  67 +++-
 .../common/dispatcher/hashed_wheel_timer.go        |  41 ++-
 internal/collector/common/dispatcher/time_wheel.go | 321 ++++++++++++++++-
 .../common/dispatcher/wheel_timer_task.go          |  57 ++-
 internal/collector/common/router/message_router.go | 269 ++++++++++++++
 internal/collector/common/transport/transport.go   |  28 +-
 internal/collector/common/types/job/job_types.go   |   9 +-
 .../collector/common/types/job/metrics_types.go    | 111 ++++--
 internal/transport/netty_client.go                 |   8 +-
 internal/transport/processors.go                   | 265 ++++++++++++--
 internal/util/crypto/aes_util.go                   | 194 ++++++++++
 internal/util/param/param_replacer.go              | 391 +++++++++++++++++++++
 tools/docker/hcg/Dockerfile                        |  35 --
 tools/make/golang.mk                               |  22 +-
 tools/make/image.mk                                |  58 ---
 23 files changed, 1810 insertions(+), 296 deletions(-)

diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..74c38a7
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+FROM golang:1.23-alpine AS golang-builder
+
+ARG GOPROXY
+# ENV GOPROXY ${GOPROXY:-direct}
+# ENV GOPROXY=https://proxy.golang.com.cn,direct
+
+ENV GOPATH /go
+ENV GOROOT /usr/local/go
+ENV PACKAGE hertzbeat.apache.org/hertzbeat-collector-go
+ENV BUILD_DIR /app
+
+COPY . ${BUILD_DIR}
+WORKDIR ${BUILD_DIR}
+RUN apk --no-cache add build-base git bash
+
+RUN make init && \
+    make fmt && \
+    make go-lint &&\
+    make build
+
+RUN chmod +x bin/collector
+
+FROM alpine
+
+ARG TIMEZONE
+ENV TIMEZONE=${TIMEZONE:-"Asia/Shanghai"}
+
+RUN apk update \
+    && apk --no-cache add \
+        bash \
+        ca-certificates \
+        curl \
+        dumb-init \
+        gettext \
+        openssh \
+        sqlite \
+        gnupg \
+        tzdata \
+    && ln -sf /usr/share/zoneinfo/${TIMEZONE} /etc/localtime \
+    && echo "${TIMEZONE}" > /etc/timezone
+
+COPY --from=golang-builder /app/bin/collector /usr/local/bin/collector
+COPY --from=golang-builder /app/etc/hertzbeat-collector.yml 
/etc/hertzbeat-collector.yml
+
+EXPOSE 8090
+ENTRYPOINT ["collector", "server", "--config", "/etc/hertzbeat-collector.yml"]
diff --git a/Makefile b/Makefile
index 6b33134..cd02530 100644
--- a/Makefile
+++ b/Makefile
@@ -17,12 +17,11 @@
 
 _run:
        @$(MAKE) --warn-undefined-variables \
-       -f tools/make/common.mk \
-       -f tools/make/golang.mk \
-       -f tools/make/linter.mk \
-       -f tools/make/image.mk \
-       -f tools/make/tools.mk \
-       $(MAKECMDGOALS)
+               -f tools/make/common.mk \
+               -f tools/make/golang.mk \
+               -f tools/make/linter.mk \
+               -f tools/make/tools.mk \
+               $(MAKECMDGOALS)
 
 .PHONY: _run
 
diff --git a/tools/docker/docker-compose/docker-compose.yml b/docker-compose.yml
similarity index 93%
rename from tools/docker/docker-compose/docker-compose.yml
rename to docker-compose.yml
index 427b692..7c158b5 100644
--- a/tools/docker/docker-compose/docker-compose.yml
+++ b/docker-compose.yml
@@ -19,11 +19,11 @@ services:
 
   # collector go service
   hertzbeat-collector:
-    image: hertzbeat-collector-go:latest
+    image: hertzbeat/hertzbeat-collector-go:latest
     container_name: hertzbeat-collector-go
     restart: on-failure
     ports:
-      - "8080:8080"
+      - "8090:8090"
     networks:
       hcg-network:
 
diff --git a/internal/cmd/server.go b/internal/cmd/server.go
index 5ff4c45..1e98ceb 100644
--- a/internal/cmd/server.go
+++ b/internal/cmd/server.go
@@ -108,17 +108,20 @@ func server(ctx context.Context, logOut io.Writer) error {
 }
 
 func startRunners(ctx context.Context, cfg *clrserver.Server) error {
+       // Create job server first
+       jobRunner := jobserver.New(&jobserver.Config{
+               Server: *cfg,
+       })
+
+       // Create transport server and connect it to job server
+       transportRunner := transportserver.NewFromConfig(cfg.Config)
+       transportRunner.SetJobScheduler(jobRunner) // Connect transport to job 
scheduler
+
        runners := []struct {
                runner Runner[collectortypes.Info]
        }{
-               {
-                       jobserver.New(&jobserver.Config{
-                               Server: *cfg,
-                       }),
-               },
-               {
-                       transportserver.NewFromConfig(cfg.Config),
-               },
+               {jobRunner},
+               {transportRunner},
                // todo; add metrics
        }
 
diff --git a/internal/collector/basic/database/jdbc_collector.go 
b/internal/collector/basic/database/jdbc_collector.go
index 01fa7f2..dd74b2f 100644
--- a/internal/collector/basic/database/jdbc_collector.go
+++ b/internal/collector/basic/database/jdbc_collector.go
@@ -30,6 +30,7 @@ import (
        _ "github.com/microsoft/go-mssqldb"
        jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
 )
 
 const (
@@ -67,6 +68,13 @@ func NewJDBCCollector(logger logger.Logger) *JDBCCollector {
        }
 }
 
+// extractJDBCConfig extracts JDBC configuration from interface{} type
+// This function uses the parameter replacer for consistent configuration 
extraction
+func extractJDBCConfig(jdbcInterface interface{}) (*jobtypes.JDBCProtocol, 
error) {
+       replacer := param.NewReplacer()
+       return replacer.ExtractJDBCConfig(jdbcInterface)
+}
+
 // PreCheck validates the JDBC metrics configuration
 func (jc *JDBCCollector) PreCheck(metrics *jobtypes.Metrics) error {
        if metrics == nil {
@@ -77,44 +85,51 @@ func (jc *JDBCCollector) PreCheck(metrics 
*jobtypes.Metrics) error {
                return fmt.Errorf("JDBC protocol configuration is required")
        }
 
-       jdbc := metrics.JDBC
+       // Extract JDBC configuration
+       jdbcConfig, err := extractJDBCConfig(metrics.JDBC)
+       if err != nil {
+               return fmt.Errorf("invalid JDBC configuration: %w", err)
+       }
+       if jdbcConfig == nil {
+               return fmt.Errorf("JDBC configuration is required")
+       }
 
        // Validate required fields when URL is not provided
-       if jdbc.URL == "" {
-               if jdbc.Host == "" {
+       if jdbcConfig.URL == "" {
+               if jdbcConfig.Host == "" {
                        return fmt.Errorf("host is required when URL is not 
provided")
                }
-               if jdbc.Port == "" {
+               if jdbcConfig.Port == "" {
                        return fmt.Errorf("port is required when URL is not 
provided")
                }
-               if jdbc.Platform == "" {
+               if jdbcConfig.Platform == "" {
                        return fmt.Errorf("platform is required when URL is not 
provided")
                }
        }
 
        // Validate platform
-       if jdbc.Platform != "" {
-               switch jdbc.Platform {
+       if jdbcConfig.Platform != "" {
+               switch jdbcConfig.Platform {
                case PlatformMySQL, PlatformMariaDB, PlatformPostgreSQL, 
PlatformSQLServer, PlatformOracle:
                        // Valid platforms
                default:
-                       return fmt.Errorf("unsupported database platform: %s", 
jdbc.Platform)
+                       return fmt.Errorf("unsupported database platform: %s", 
jdbcConfig.Platform)
                }
        }
 
        // Validate query type
-       if jdbc.QueryType != "" {
-               switch jdbc.QueryType {
+       if jdbcConfig.QueryType != "" {
+               switch jdbcConfig.QueryType {
                case QueryTypeOneRow, QueryTypeMultiRow, QueryTypeColumns, 
QueryTypeRunScript:
                        // Valid query types
                default:
-                       return fmt.Errorf("unsupported query type: %s", 
jdbc.QueryType)
+                       return fmt.Errorf("unsupported query type: %s", 
jdbcConfig.QueryType)
                }
        }
 
        // Validate SQL for most query types
-       if jdbc.QueryType != QueryTypeRunScript && jdbc.SQL == "" {
-               return fmt.Errorf("SQL is required for query type: %s", 
jdbc.QueryType)
+       if jdbcConfig.QueryType != QueryTypeRunScript && jdbcConfig.SQL == "" {
+               return fmt.Errorf("SQL is required for query type: %s", 
jdbcConfig.QueryType)
        }
 
        return nil
@@ -123,27 +138,35 @@ func (jc *JDBCCollector) PreCheck(metrics 
*jobtypes.Metrics) error {
 // Collect performs JDBC metrics collection
 func (jc *JDBCCollector) Collect(metrics *jobtypes.Metrics) 
*jobtypes.CollectRepMetricsData {
        startTime := time.Now()
-       jdbc := metrics.JDBC
 
-       jc.logger.Info("starting JDBC collection",
-               "host", jdbc.Host,
-               "port", jdbc.Port,
-               "platform", jdbc.Platform,
-               "database", jdbc.Database,
-               "queryType", jdbc.QueryType)
+       // Extract JDBC configuration
+       jdbcConfig, err := extractJDBCConfig(metrics.JDBC)
+       if err != nil {
+               jc.logger.Error(err, "failed to extract JDBC config")
+               return jc.createFailResponse(metrics, CodeFail, 
fmt.Sprintf("JDBC config error: %v", err))
+       }
+       if jdbcConfig == nil {
+               return jc.createFailResponse(metrics, CodeFail, "JDBC 
configuration is required")
+       }
+
+       // Debug level only for collection start
+       jc.logger.V(1).Info("starting JDBC collection",
+               "host", jdbcConfig.Host,
+               "platform", jdbcConfig.Platform,
+               "queryType", jdbcConfig.QueryType)
 
        // Get timeout
-       timeout := jc.getTimeout(jdbc.Timeout)
+       timeout := jc.getTimeout(jdbcConfig.Timeout)
 
        // Get database URL
-       databaseURL, err := jc.constructDatabaseURL(jdbc)
+       databaseURL, err := jc.constructDatabaseURL(jdbcConfig)
        if err != nil {
                jc.logger.Error(err, "failed to construct database URL")
                return jc.createFailResponse(metrics, CodeFail, 
fmt.Sprintf("Database URL error: %v", err))
        }
 
        // Create database connection
-       db, err := jc.getConnection(databaseURL, jdbc.Username, jdbc.Password, 
timeout)
+       db, err := jc.getConnection(databaseURL, jdbcConfig.Username, 
jdbcConfig.Password, timeout)
        if err != nil {
                jc.logger.Error(err, "failed to connect to database")
                return jc.createFailResponse(metrics, CodeUnConnectable, 
fmt.Sprintf("Connection error: %v", err))
@@ -153,26 +176,27 @@ func (jc *JDBCCollector) Collect(metrics 
*jobtypes.Metrics) *jobtypes.CollectRep
        // Execute query based on type with context
        response := jc.createSuccessResponse(metrics)
 
-       switch jdbc.QueryType {
+       switch jdbcConfig.QueryType {
        case QueryTypeOneRow:
-               err = jc.queryOneRow(db, jdbc.SQL, metrics.Aliasfields, 
response)
+               err = jc.queryOneRow(db, jdbcConfig.SQL, metrics.Aliasfields, 
response)
        case QueryTypeMultiRow:
-               err = jc.queryMultiRow(db, jdbc.SQL, metrics.Aliasfields, 
response)
+               err = jc.queryMultiRow(db, jdbcConfig.SQL, metrics.Aliasfields, 
response)
        case QueryTypeColumns:
-               err = jc.queryColumns(db, jdbc.SQL, metrics.Aliasfields, 
response)
+               err = jc.queryColumns(db, jdbcConfig.SQL, metrics.Aliasfields, 
response)
        case QueryTypeRunScript:
-               err = jc.runScript(db, jdbc.SQL, response)
+               err = jc.runScript(db, jdbcConfig.SQL, response)
        default:
-               err = fmt.Errorf("unsupported query type: %s", jdbc.QueryType)
+               err = fmt.Errorf("unsupported query type: %s", 
jdbcConfig.QueryType)
        }
 
        if err != nil {
-               jc.logger.Error(err, "query execution failed", "queryType", 
jdbc.QueryType)
+               jc.logger.Error(err, "query execution failed", "queryType", 
jdbcConfig.QueryType)
                return jc.createFailResponse(metrics, CodeFail, 
fmt.Sprintf("Query error: %v", err))
        }
 
        duration := time.Since(startTime)
-       jc.logger.Info("JDBC collection completed",
+       // Debug level only for successful completion
+       jc.logger.V(1).Info("JDBC collection completed",
                "duration", duration,
                "rowCount", len(response.Values))
 
@@ -195,9 +219,8 @@ func (jc *JDBCCollector) getTimeout(timeoutStr string) 
time.Duration {
                return duration
        }
 
-       // If it's a pure number, treat it as seconds (more reasonable for 
database connections)
        if timeout, err := strconv.Atoi(timeoutStr); err == nil {
-               return time.Duration(timeout) * time.Second
+               return time.Duration(timeout) * time.Millisecond
        }
 
        return 30 * time.Second // fallback to default
@@ -205,10 +228,6 @@ func (jc *JDBCCollector) getTimeout(timeoutStr string) 
time.Duration {
 
 // constructDatabaseURL constructs the database connection URL
 func (jc *JDBCCollector) constructDatabaseURL(jdbc *jobtypes.JDBCProtocol) 
(string, error) {
-       // If URL is provided directly, use it
-       if jdbc.URL != "" {
-               return jdbc.URL, nil
-       }
 
        // Construct URL based on platform
        host := jdbc.Host
@@ -217,7 +236,8 @@ func (jc *JDBCCollector) constructDatabaseURL(jdbc 
*jobtypes.JDBCProtocol) (stri
 
        switch jdbc.Platform {
        case PlatformMySQL, PlatformMariaDB:
-               return 
fmt.Sprintf("mysql://%s:%s@tcp(%s:%s)/%s?parseTime=true&charset=utf8mb4",
+               // MySQL DSN format: 
[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
+               return 
fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?parseTime=true&charset=utf8mb4",
                        jdbc.Username, jdbc.Password, host, port, database), nil
        case PlatformPostgreSQL:
                return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable",
@@ -234,14 +254,13 @@ func (jc *JDBCCollector) constructDatabaseURL(jdbc 
*jobtypes.JDBCProtocol) (stri
 func (jc *JDBCCollector) getConnection(databaseURL, username, password string, 
timeout time.Duration) (*sql.DB, error) {
        // Extract driver name from URL
        var driverName string
-       if strings.HasPrefix(databaseURL, "mysql://") {
-               driverName = "mysql"
-               // Remove mysql:// prefix for the driver
-               databaseURL = strings.TrimPrefix(databaseURL, "mysql://")
-       } else if strings.HasPrefix(databaseURL, "postgres://") {
+       if strings.HasPrefix(databaseURL, "postgres://") {
                driverName = "postgres"
        } else if strings.HasPrefix(databaseURL, "sqlserver://") {
                driverName = "sqlserver"
+       } else if strings.Contains(databaseURL, "@tcp(") {
+               // MySQL DSN format (no protocol prefix)
+               driverName = "mysql"
        } else {
                return nil, fmt.Errorf("unsupported database URL format: %s", 
databaseURL)
        }
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go 
b/internal/collector/common/collect/dispatch/metrics_collector.go
index 85e7580..bc3ea2e 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -53,8 +53,8 @@ func (mc *MetricsCollector) CollectMetrics(metrics 
*jobtypes.Metrics, job *jobty
        go func() {
                defer close(resultChan)
 
-               mc.logger.Info("starting metrics collection",
-                       "jobID", job.ID,
+               // Debug level only for collection start
+               mc.logger.V(1).Info("starting metrics collection",
                        "metricsName", metrics.Name,
                        "protocol", metrics.Protocol)
 
@@ -72,7 +72,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics 
*jobtypes.Metrics, job *jobty
                        return
                }
 
-               // Perform the actual collection
+               // Perform the actual collection with metrics (parameters 
should already be replaced by CommonDispatcher)
                result := collector.Collect(metrics)
 
                // Enrich result with job information
@@ -101,9 +101,9 @@ func (mc *MetricsCollector) CollectMetrics(metrics 
*jobtypes.Metrics, job *jobty
 
                duration := time.Since(startTime)
 
+               // Only log failures at INFO level, success at debug level
                if result != nil && result.Code == http.StatusOK {
-                       mc.logger.Info("metrics collection completed 
successfully",
-                               "jobID", job.ID,
+                       mc.logger.V(1).Info("metrics collection completed 
successfully",
                                "metricsName", metrics.Name,
                                "protocol", metrics.Protocol,
                                "duration", duration,
diff --git a/internal/collector/common/collect/result_handler.go 
b/internal/collector/common/collect/result_handler.go
index 7f5a304..22ceb1f 100644
--- a/internal/collector/common/collect/result_handler.go
+++ b/internal/collector/common/collect/result_handler.go
@@ -52,9 +52,8 @@ func (rh *ResultHandlerImpl) HandleCollectData(data 
*jobtypes.CollectRepMetricsD
                return fmt.Errorf("collect data is nil")
        }
 
-       rh.logger.Info("handling collect data",
-               "jobID", job.ID,
-               "monitorID", job.MonitorID,
+       // Debug level only for data handling
+       rh.logger.V(1).Info("handling collect data",
                "metricsName", data.Metrics,
                "code", data.Code,
                "valuesCount", len(data.Values))
@@ -67,9 +66,9 @@ func (rh *ResultHandlerImpl) HandleCollectData(data 
*jobtypes.CollectRepMetricsD
        // 4. Triggering alerts based on thresholds
        // 5. Updating monitoring status
 
+       // Only log failures at INFO level, success at debug level
        if data.Code == http.StatusOK {
-               rh.logger.Info("successfully processed collect data",
-                       "jobID", job.ID,
+               rh.logger.V(1).Info("successfully processed collect data",
                        "metricsName", data.Metrics)
        } else {
                rh.logger.Info("received failed collect data",
diff --git a/internal/collector/common/collect/strategy/strategy_factory.go 
b/internal/collector/common/collect/strategy/strategy_factory.go
index fa951b0..9fc9af1 100644
--- a/internal/collector/common/collect/strategy/strategy_factory.go
+++ b/internal/collector/common/collect/strategy/strategy_factory.go
@@ -76,15 +76,6 @@ func InitializeCollectors(logger logger.Logger) {
        }
 
        globalRegistry.initialized = true
-
-       // Log initialization results
-       protocols := make([]string, 0, len(globalRegistry.collectors))
-       for protocol := range globalRegistry.collectors {
-               protocols = append(protocols, protocol)
-       }
-       logger.Info("collectors initialized from factories",
-               "protocols", protocols,
-               "count", len(protocols))
 }
 
 // Register a collector directly (legacy method, kept for compatibility)
diff --git a/internal/collector/common/dispatcher/common_dispatcher.go 
b/internal/collector/common/dispatcher/common_dispatcher.go
index 137304a..8d37be5 100644
--- a/internal/collector/common/dispatcher/common_dispatcher.go
+++ b/internal/collector/common/dispatcher/common_dispatcher.go
@@ -28,6 +28,7 @@ import (
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
        jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
 )
 
 // CommonDispatcherImpl is responsible for breaking down jobs into individual 
metrics collection tasks
@@ -59,9 +60,9 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx 
context.Context, job *jo
                return fmt.Errorf("job cannot be nil")
        }
 
-       cd.logger.Info("dispatching metrics task",
+       // Dispatching metrics task - only log at debug level
+       cd.logger.V(1).Info("dispatching metrics task",
                "jobID", job.ID,
-               "monitorID", job.MonitorID,
                "app", job.App,
                "metricsCount", len(job.Metrics))
 
@@ -74,6 +75,16 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx 
context.Context, job *jo
                return nil
        }
 
+       // Replace parameters in job configuration ONCE before concurrent 
collection
+       // This avoids concurrent map access issues in MetricsCollector
+       paramReplacer := param.NewReplacer()
+       processedJob, err := paramReplacer.ReplaceJobParams(job)
+       if err != nil {
+               cd.logger.Error(err, "failed to replace job parameters",
+                       "jobID", job.ID)
+               return fmt.Errorf("parameter replacement failed: %w", err)
+       }
+
        // Create collection context with timeout
        collectCtx, collectCancel := context.WithTimeout(ctx, 
cd.getCollectionTimeout(job))
        defer collectCancel()
@@ -82,13 +93,26 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx 
context.Context, job *jo
        resultChannels := make([]chan *jobtypes.CollectRepMetricsData, 
len(metricsToCollect))
 
        for i, metrics := range metricsToCollect {
-               cd.logger.Info("starting metrics collection",
-                       "jobID", job.ID,
+               // Starting metrics collection - debug level only
+               cd.logger.V(1).Info("starting metrics collection",
                        "metricsName", metrics.Name,
                        "protocol", metrics.Protocol)
 
-               // Start metrics collection in goroutine
-               resultChannels[i] = cd.metricsCollector.CollectMetrics(metrics, 
job, timeout)
+               // Find the processed metrics in the processed job
+               var processedMetrics *jobtypes.Metrics
+               for j := range processedJob.Metrics {
+                       if processedJob.Metrics[j].Name == metrics.Name {
+                               processedMetrics = &processedJob.Metrics[j]
+                               break
+                       }
+               }
+               if processedMetrics == nil {
+                       cd.logger.Error(nil, "processed metrics not found", 
"metricsName", metrics.Name)
+                       continue
+               }
+
+               // Start metrics collection in goroutine with processed data
+               resultChannels[i] = 
cd.metricsCollector.CollectMetrics(processedMetrics, processedJob, timeout)
        }
 
        // Collect results from all metrics collection tasks
@@ -100,8 +124,8 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx 
context.Context, job *jo
                case result := <-resultChan:
                        if result != nil {
                                results = append(results, result)
-                               cd.logger.Info("received metrics result",
-                                       "jobID", job.ID,
+                               // Metrics result received - debug level only
+                               cd.logger.V(1).Info("received metrics result",
                                        "metricsName", metricsToCollect[i].Name,
                                        "code", result.Code)
                        }
@@ -123,18 +147,26 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx 
context.Context, job *jo
                return err
        }
 
-       cd.logger.Info("successfully dispatched metrics task",
-               "jobID", job.ID,
-               "resultsCount", len(results),
-               "errorsCount", len(errors),
-               "duration", duration)
+       // Only log summary for successful tasks if there were errors
+       if len(errors) > 0 {
+               cd.logger.Info("metrics task completed with errors",
+                       "jobID", job.ID,
+                       "errorsCount", len(errors),
+                       "duration", duration)
+       } else {
+               cd.logger.V(1).Info("metrics task completed successfully",
+                       "jobID", job.ID,
+                       "resultsCount", len(results),
+                       "duration", duration)
+       }
 
        return nil
 }
 
 // handleResults processes the collection results and decides on next actions
 func (cd *CommonDispatcherImpl) handleResults(results 
[]*jobtypes.CollectRepMetricsData, job *jobtypes.Job, errors []error) error {
-       cd.logger.Info("handling collection results",
+       // Debug level only for result handling
+       cd.logger.V(1).Info("handling collection results",
                "jobID", job.ID,
                "resultsCount", len(results),
                "errorsCount", len(errors))
@@ -163,7 +195,8 @@ func (cd *CommonDispatcherImpl) handleResults(results 
[]*jobtypes.CollectRepMetr
 
 // evaluateNextActions decides what to do next based on collection results
 func (cd *CommonDispatcherImpl) evaluateNextActions(job *jobtypes.Job, results 
[]*jobtypes.CollectRepMetricsData, errors []error) {
-       cd.logger.Info("evaluating next actions",
+       // Debug level only for next actions evaluation
+       cd.logger.V(1).Info("evaluating next actions",
                "jobID", job.ID,
                "resultsCount", len(results),
                "errorsCount", len(errors))
@@ -172,13 +205,13 @@ func (cd *CommonDispatcherImpl) evaluateNextActions(job 
*jobtypes.Job, results [
        hasNextLevel := cd.hasNextLevelMetrics(job, results)
 
        if hasNextLevel {
-               cd.logger.Info("job has next level metrics to collect", 
"jobID", job.ID)
+               cd.logger.V(1).Info("job has next level metrics to collect", 
"jobID", job.ID)
                // TODO: Schedule next level metrics collection
        }
 
        // For cyclic jobs, the rescheduling is handled by WheelTimerTask
        if job.IsCyclic {
-               cd.logger.Info("cyclic job will be rescheduled by timer", 
"jobID", job.ID)
+               cd.logger.V(1).Info("cyclic job will be rescheduled by timer", 
"jobID", job.ID)
        }
 
        // TODO: Send results to data queue for further processing
diff --git a/internal/collector/common/dispatcher/hashed_wheel_timer.go 
b/internal/collector/common/dispatcher/hashed_wheel_timer.go
index 3ebe931..9100205 100644
--- a/internal/collector/common/dispatcher/hashed_wheel_timer.go
+++ b/internal/collector/common/dispatcher/hashed_wheel_timer.go
@@ -100,7 +100,8 @@ func (hwt *hashedWheelTimer) NewTimeout(task 
jobtypes.TimerTask, delay time.Dura
        bucket.timeouts = append(bucket.timeouts, timeout)
        bucket.mu.Unlock()
 
-       hwt.logger.Info("created timeout",
+       // Log timeout creation only for debugging
+       hwt.logger.V(1).Info("created timeout",
                "delay", delay,
                "bucketIndex", bucketIndex,
                "targetTick", targetTick)
@@ -114,9 +115,7 @@ func (hwt *hashedWheelTimer) Start(ctx context.Context) 
error {
                return nil // already started
        }
 
-       hwt.logger.Info("starting hashed wheel timer",
-               "wheelSize", hwt.wheelSize,
-               "tickDuration", hwt.tickDuration)
+       hwt.logger.Info("starting hashed wheel timer")
 
        hwt.startTime = time.Now()
        hwt.ticker = time.NewTicker(hwt.tickDuration)
@@ -138,8 +137,6 @@ func (hwt *hashedWheelTimer) Stop() error {
                return nil // already stopped
        }
 
-       hwt.logger.Info("stopping hashed wheel timer")
-
        hwt.cancel()
 
        if hwt.ticker != nil {
@@ -174,6 +171,14 @@ func (hwt *hashedWheelTimer) tick() {
        bucket := hwt.wheel[bucketIndex]
        bucket.mu.Lock()
 
+       // Only log tick processing if there are timeouts to process
+       if len(bucket.timeouts) > 0 {
+               hwt.logger.V(1).Info("processing tick",
+                       "currentTick", currentTick,
+                       "bucketIndex", bucketIndex,
+                       "timeoutsCount", len(bucket.timeouts))
+       }
+
        // Process expired timeouts
        var remaining []*jobtypes.Timeout
        for _, timeout := range bucket.timeouts {
@@ -181,8 +186,18 @@ func (hwt *hashedWheelTimer) tick() {
                        continue // skip cancelled timeouts
                }
 
-               // Check if timeout has expired
-               if time.Now().After(timeout.Deadline()) {
+               now := time.Now()
+               deadline := timeout.Deadline()
+
+               // Check if timeout should execute based on wheel position
+               // A timeout should execute when currentTick >= its targetTick
+               wheelIndex := timeout.WheelIndex()
+               if currentTick >= int64(wheelIndex) {
+                       // Only log actual task execution at INFO level
+                       hwt.logger.Info("executing scheduled task",
+                               "currentTick", currentTick,
+                               "targetTick", wheelIndex)
+
                        // Execute the timeout task asynchronously
                        go func(t *jobtypes.Timeout) {
                                defer func() {
@@ -196,7 +211,12 @@ func (hwt *hashedWheelTimer) tick() {
                                }
                        }(timeout)
                } else {
-                       // Not yet expired, keep in bucket
+                       // Not yet time to execute, keep in bucket
+                       hwt.logger.V(1).Info("timeout not ready by wheel 
position",
+                               "currentTick", currentTick,
+                               "targetTick", wheelIndex,
+                               "deadline", deadline,
+                               "now", now)
                        remaining = append(remaining, timeout)
                }
        }
@@ -216,10 +236,11 @@ func (hwt *hashedWheelTimer) GetStats() 
map[string]interface{} {
 
        return map[string]interface{}{
                "wheelSize":     hwt.wheelSize,
-               "tickDuration":  hwt.tickDuration,
+               "tickDuration":  hwt.tickDuration.String(),
                "currentTick":   atomic.LoadInt64(&hwt.currentTick),
                "totalTimeouts": totalTimeouts,
                "started":       atomic.LoadInt32(&hwt.started) == 1,
                "stopped":       atomic.LoadInt32(&hwt.stopped) == 1,
+               "wheelPeriod":   (time.Duration(hwt.wheelSize) * 
hwt.tickDuration).String(),
        }
 }
diff --git a/internal/collector/common/dispatcher/time_wheel.go 
b/internal/collector/common/dispatcher/time_wheel.go
index 30d02ad..d8c7aa1 100644
--- a/internal/collector/common/dispatcher/time_wheel.go
+++ b/internal/collector/common/dispatcher/time_wheel.go
@@ -29,6 +29,16 @@ import (
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
+// JobInfo holds information about a scheduled job
+type JobInfo struct {
+       Job             *jobtypes.Job
+       Timeout         *jobtypes.Timeout
+       CreatedAt       time.Time
+       LastExecutedAt  time.Time
+       NextExecutionAt time.Time
+       ExecutionCount  int64
+}
+
 // TimeDispatch manages time-based job scheduling using a hashed wheel timer
 type TimeDispatch struct {
        logger           logger.Logger
@@ -36,8 +46,26 @@ type TimeDispatch struct {
        commonDispatcher MetricsTaskDispatcher
        cyclicTasks      sync.Map // map[int64]*jobtypes.Timeout for cyclic jobs
        tempTasks        sync.Map // map[int64]*jobtypes.Timeout for one-time 
jobs
+       jobInfos         sync.Map // map[int64]*JobInfo for detailed job 
information
        started          bool
        mu               sync.RWMutex
+
+       // Statistics
+       stats       *DispatcherStats
+       statsLogger *time.Ticker
+       statsCancel context.CancelFunc
+}
+
+// DispatcherStats holds dispatcher statistics
+type DispatcherStats struct {
+       StartTime          time.Time
+       TotalJobsAdded     int64
+       TotalJobsRemoved   int64
+       TotalJobsExecuted  int64
+       TotalJobsCompleted int64
+       TotalJobsFailed    int64
+       LastExecutionTime  time.Time
+       mu                 sync.RWMutex
 }
 
 // MetricsTaskDispatcher interface for metrics task dispatching
@@ -50,6 +78,7 @@ type HashedWheelTimer interface {
        NewTimeout(task jobtypes.TimerTask, delay time.Duration) 
*jobtypes.Timeout
        Start(ctx context.Context) error
        Stop() error
+       GetStats() map[string]interface{}
 }
 
 // NewTimeDispatch creates a new time dispatcher
@@ -58,6 +87,9 @@ func NewTimeDispatch(logger logger.Logger, commonDispatcher 
MetricsTaskDispatche
                logger:           logger.WithName("time-dispatch"),
                commonDispatcher: commonDispatcher,
                started:          false,
+               stats: &DispatcherStats{
+                       StartTime: time.Now(),
+               },
        }
 
        // Create hashed wheel timer with reasonable defaults
@@ -74,32 +106,56 @@ func (td *TimeDispatch) AddJob(job *jobtypes.Job) error {
                return fmt.Errorf("job cannot be nil")
        }
 
-       td.logger.Info("adding job to time dispatcher",
+       // Log job addition at debug level
+       td.logger.V(1).Info("adding job",
                "jobID", job.ID,
-               "isCyclic", job.IsCyclic,
                "interval", job.DefaultInterval)
 
+       // Update statistics
+       td.stats.mu.Lock()
+       td.stats.TotalJobsAdded++
+       td.stats.mu.Unlock()
+
        // Create wheel timer task
-       timerTask := NewWheelTimerTask(job, td.commonDispatcher, td.logger)
+       timerTask := NewWheelTimerTask(job, td.commonDispatcher, td, td.logger)
 
        // Calculate delay
        var delay time.Duration
        if job.DefaultInterval > 0 {
-               delay = time.Duration(job.DefaultInterval) * time.Second
+               delay = time.Duration(job.DefaultInterval) * time.Millisecond
+               td.logger.V(1).Info("calculated delay",
+                       "interval", job.DefaultInterval,
+                       "delay", delay)
        } else {
-               delay = 30 * time.Second // default interval
+               delay = 10 * time.Second
        }
 
        // Create timeout
        timeout := td.wheelTimer.NewTimeout(timerTask, delay)
 
-       // Store timeout based on job type
+       // Create job info
+       now := time.Now()
+       jobInfo := &JobInfo{
+               Job:             job,
+               Timeout:         timeout,
+               CreatedAt:       now,
+               NextExecutionAt: now.Add(delay),
+               ExecutionCount:  0,
+       }
+
+       // Store timeout and job info based on job type
        if job.IsCyclic {
                td.cyclicTasks.Store(job.ID, timeout)
-               td.logger.Info("added cyclic job to scheduler", "jobID", 
job.ID, "delay", delay)
+               td.jobInfos.Store(job.ID, jobInfo)
+               td.logger.Info("scheduled cyclic job",
+                       "jobID", job.ID,
+                       "nextExecution", jobInfo.NextExecutionAt)
        } else {
                td.tempTasks.Store(job.ID, timeout)
-               td.logger.Info("added one-time job to scheduler", "jobID", 
job.ID, "delay", delay)
+               td.jobInfos.Store(job.ID, jobInfo)
+               td.logger.Info("scheduled one-time job",
+                       "jobID", job.ID,
+                       "nextExecution", jobInfo.NextExecutionAt)
        }
 
        return nil
@@ -107,13 +163,19 @@ func (td *TimeDispatch) AddJob(job *jobtypes.Job) error {
 
 // RemoveJob removes a job from the scheduler
 func (td *TimeDispatch) RemoveJob(jobID int64) error {
-       td.logger.Info("removing job from time dispatcher", "jobID", jobID)
+       td.logger.V(1).Info("removing job", "jobID", jobID)
+
+       // Update statistics
+       td.stats.mu.Lock()
+       td.stats.TotalJobsRemoved++
+       td.stats.mu.Unlock()
 
        // Try to remove from cyclic tasks
        if timeoutInterface, exists := td.cyclicTasks.LoadAndDelete(jobID); 
exists {
                if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok {
                        timeout.Cancel()
-                       td.logger.Info("removed cyclic job", "jobID", jobID)
+                       td.jobInfos.Delete(jobID)
+                       td.logger.V(1).Info("removed cyclic job", "jobID", 
jobID)
                        return nil
                }
        }
@@ -122,7 +184,8 @@ func (td *TimeDispatch) RemoveJob(jobID int64) error {
        if timeoutInterface, exists := td.tempTasks.LoadAndDelete(jobID); 
exists {
                if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok {
                        timeout.Cancel()
-                       td.logger.Info("removed one-time job", "jobID", jobID)
+                       td.jobInfos.Delete(jobID)
+                       td.logger.V(1).Info("removed one-time job", "jobID", 
jobID)
                        return nil
                }
        }
@@ -146,8 +209,11 @@ func (td *TimeDispatch) Start(ctx context.Context) error {
                return fmt.Errorf("failed to start wheel timer: %w", err)
        }
 
+       // Start periodic statistics logging
+       td.startStatsLogging(ctx)
+
        td.started = true
-       td.logger.Info("time dispatcher started successfully")
+       // Started successfully
        return nil
 }
 
@@ -162,6 +228,9 @@ func (td *TimeDispatch) Stop() error {
 
        td.logger.Info("stopping time dispatcher")
 
+       // Stop statistics logging
+       td.stopStatsLogging()
+
        // Cancel all running tasks
        td.cyclicTasks.Range(func(key, value interface{}) bool {
                if timeout, ok := value.(*jobtypes.Timeout); ok {
@@ -180,6 +249,7 @@ func (td *TimeDispatch) Stop() error {
        // Clear maps
        td.cyclicTasks = sync.Map{}
        td.tempTasks = sync.Map{}
+       td.jobInfos = sync.Map{}
 
        // Stop wheel timer
        if err := td.wheelTimer.Stop(); err != nil {
@@ -206,9 +276,230 @@ func (td *TimeDispatch) Stats() map[string]any {
                return true
        })
 
+       td.stats.mu.RLock()
+       uptime := time.Since(td.stats.StartTime)
+       totalJobsAdded := td.stats.TotalJobsAdded
+       totalJobsRemoved := td.stats.TotalJobsRemoved
+       totalJobsExecuted := td.stats.TotalJobsExecuted
+       totalJobsCompleted := td.stats.TotalJobsCompleted
+       totalJobsFailed := td.stats.TotalJobsFailed
+       lastExecutionTime := td.stats.LastExecutionTime
+       td.stats.mu.RUnlock()
+
        return map[string]any{
-               "cyclicJobs": cyclicCount,
-               "tempJobs":   tempCount,
-               "started":    td.started,
+               "cyclicJobs":         cyclicCount,
+               "tempJobs":           tempCount,
+               "started":            td.started,
+               "uptime":             uptime,
+               "totalJobsAdded":     totalJobsAdded,
+               "totalJobsRemoved":   totalJobsRemoved,
+               "totalJobsExecuted":  totalJobsExecuted,
+               "totalJobsCompleted": totalJobsCompleted,
+               "totalJobsFailed":    totalJobsFailed,
+               "lastExecutionTime":  lastExecutionTime,
+       }
+}
+
+// RecordJobExecution records job execution statistics
+func (td *TimeDispatch) RecordJobExecution() {
+       td.stats.mu.Lock()
+       td.stats.TotalJobsExecuted++
+       td.stats.LastExecutionTime = time.Now()
+       td.stats.mu.Unlock()
+}
+
+// RecordJobCompleted records job completion
+func (td *TimeDispatch) RecordJobCompleted() {
+       td.stats.mu.Lock()
+       td.stats.TotalJobsCompleted++
+       td.stats.mu.Unlock()
+}
+
+// RecordJobFailed records job failure
+func (td *TimeDispatch) RecordJobFailed() {
+       td.stats.mu.Lock()
+       td.stats.TotalJobsFailed++
+       td.stats.mu.Unlock()
+}
+
+// UpdateJobExecution updates job execution information
+func (td *TimeDispatch) UpdateJobExecution(jobID int64) {
+       if jobInfoInterface, exists := td.jobInfos.Load(jobID); exists {
+               if jobInfo, ok := jobInfoInterface.(*JobInfo); ok {
+                       now := time.Now()
+                       jobInfo.LastExecutedAt = now
+                       jobInfo.ExecutionCount++
+
+                       // Update next execution time for cyclic jobs
+                       if jobInfo.Job.IsCyclic && jobInfo.Job.DefaultInterval 
> 0 {
+                               jobInfo.NextExecutionAt = 
now.Add(time.Duration(jobInfo.Job.DefaultInterval) * time.Second)
+                       }
+               }
        }
 }
+
+// RescheduleJob reschedules a cyclic job for the next execution
+func (td *TimeDispatch) RescheduleJob(job *jobtypes.Job) error {
+       if !job.IsCyclic || job.DefaultInterval <= 0 {
+               return fmt.Errorf("job is not cyclable or has invalid interval")
+       }
+
+       // Create new timer task for next execution
+       nextDelay := time.Duration(job.DefaultInterval) * time.Second
+       timerTask := NewWheelTimerTask(job, td.commonDispatcher, td, td.logger)
+
+       // Schedule the next execution
+       timeout := td.wheelTimer.NewTimeout(timerTask, nextDelay)
+
+       // Update the timeout in cyclicTasks
+       td.cyclicTasks.Store(job.ID, timeout)
+
+       // Update job info next execution time
+       if jobInfoInterface, exists := td.jobInfos.Load(job.ID); exists {
+               if jobInfo, ok := jobInfoInterface.(*JobInfo); ok {
+                       jobInfo.NextExecutionAt = time.Now().Add(nextDelay)
+                       jobInfo.Timeout = timeout
+               }
+       }
+
+       nextExecutionTime := time.Now().Add(nextDelay)
+       td.logger.Info("rescheduled cyclic job",
+               "jobID", job.ID,
+               "interval", job.DefaultInterval,
+               "nextExecution", nextExecutionTime,
+               "delay", nextDelay)
+
+       return nil
+}
+
+// startStatsLogging starts periodic statistics logging
+func (td *TimeDispatch) startStatsLogging(ctx context.Context) {
+       // Create a context that can be cancelled
+       statsCtx, cancel := context.WithCancel(ctx)
+       td.statsCancel = cancel
+
+       // Create ticker for 1-minute intervals
+       td.statsLogger = time.NewTicker(1 * time.Minute)
+
+       go func() {
+               defer td.statsLogger.Stop()
+
+               // Log initial statistics
+               td.logStatistics()
+
+               for {
+                       select {
+                       case <-td.statsLogger.C:
+                               td.logStatistics()
+                       case <-statsCtx.Done():
+                               td.logger.Info("statistics logging stopped")
+                               return
+                       }
+               }
+       }()
+
+       td.logger.Info("statistics logging started", "interval", "1m")
+}
+
+// stopStatsLogging stops periodic statistics logging
+func (td *TimeDispatch) stopStatsLogging() {
+       if td.statsCancel != nil {
+               td.statsCancel()
+               td.statsCancel = nil
+       }
+       if td.statsLogger != nil {
+               td.statsLogger.Stop()
+               td.statsLogger = nil
+       }
+}
+
+// logStatistics logs current dispatcher statistics
+func (td *TimeDispatch) logStatistics() {
+       stats := td.Stats()
+
+       // Get detailed job information
+       var cyclicJobDetails []map[string]any
+       var tempJobDetails []map[string]any
+
+       td.cyclicTasks.Range(func(key, value interface{}) bool {
+               if jobID, ok := key.(int64); ok {
+                       if jobInfoInterface, exists := td.jobInfos.Load(jobID); 
exists {
+                               if jobInfo, ok := jobInfoInterface.(*JobInfo); 
ok {
+                                       now := time.Now()
+                                       detail := map[string]any{
+                                               "jobID":         jobID,
+                                               "app":           
jobInfo.Job.App,
+                                               "monitorID":     
jobInfo.Job.MonitorID,
+                                               "interval":      
jobInfo.Job.DefaultInterval,
+                                               "createdAt":     
jobInfo.CreatedAt,
+                                               "lastExecuted":  
jobInfo.LastExecutedAt,
+                                               "nextExecution": 
jobInfo.NextExecutionAt,
+                                               "execCount":     
jobInfo.ExecutionCount,
+                                               "timeSinceLastExec": func() 
string {
+                                                       if 
jobInfo.LastExecutedAt.IsZero() {
+                                                               return "never"
+                                                       }
+                                                       return 
now.Sub(jobInfo.LastExecutedAt).String()
+                                               }(),
+                                               "timeToNextExec": func() string 
{
+                                                       if 
jobInfo.NextExecutionAt.IsZero() {
+                                                               return "unknown"
+                                                       }
+                                                       if 
jobInfo.NextExecutionAt.Before(now) {
+                                                               return "overdue"
+                                                       }
+                                                       return 
jobInfo.NextExecutionAt.Sub(now).String()
+                                               }(),
+                                       }
+                                       cyclicJobDetails = 
append(cyclicJobDetails, detail)
+                               }
+                       }
+               }
+               return true
+       })
+
+       td.tempTasks.Range(func(key, value interface{}) bool {
+               if jobID, ok := key.(int64); ok {
+                       if jobInfoInterface, exists := td.jobInfos.Load(jobID); 
exists {
+                               if jobInfo, ok := jobInfoInterface.(*JobInfo); 
ok {
+                                       detail := map[string]any{
+                                               "jobID":         jobID,
+                                               "app":           
jobInfo.Job.App,
+                                               "monitorID":     
jobInfo.Job.MonitorID,
+                                               "createdAt":     
jobInfo.CreatedAt,
+                                               "nextExecution": 
jobInfo.NextExecutionAt,
+                                               "execCount":     
jobInfo.ExecutionCount,
+                                       }
+                                       tempJobDetails = append(tempJobDetails, 
detail)
+                               }
+                       }
+               }
+               return true
+       })
+
+       // Get timer wheel statistics
+       var timerStats map[string]interface{}
+       if td.wheelTimer != nil {
+               timerStats = td.wheelTimer.GetStats()
+       }
+
+       td.logger.Info("📊 Dispatcher Statistics Report",
+               "uptime", stats["uptime"],
+               "activeJobs", map[string]any{
+                       "cyclic": stats["cyclicJobs"],
+                       "temp":   stats["tempJobs"],
+                       "total":  stats["cyclicJobs"].(int) + 
stats["tempJobs"].(int),
+               },
+               "jobCounters", map[string]any{
+                       "added":     stats["totalJobsAdded"],
+                       "removed":   stats["totalJobsRemoved"],
+                       "executed":  stats["totalJobsExecuted"],
+                       "completed": stats["totalJobsCompleted"],
+                       "failed":    stats["totalJobsFailed"],
+               },
+               "timerWheelConfig", timerStats,
+               "lastExecution", stats["lastExecutionTime"],
+               "cyclicJobs", cyclicJobDetails,
+               "tempJobs", tempJobDetails,
+       )
+}
diff --git a/internal/collector/common/dispatcher/wheel_timer_task.go 
b/internal/collector/common/dispatcher/wheel_timer_task.go
index 3eb53e3..657103a 100644
--- a/internal/collector/common/dispatcher/wheel_timer_task.go
+++ b/internal/collector/common/dispatcher/wheel_timer_task.go
@@ -28,19 +28,30 @@ import (
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
+// StatsRecorder interface for recording job execution statistics
+type StatsRecorder interface {
+       RecordJobExecution()
+       RecordJobCompleted()
+       RecordJobFailed()
+       UpdateJobExecution(jobID int64)
+       RescheduleJob(job *jobtypes.Job) error
+}
+
 // WheelTimerTask represents a task that runs in the time wheel
 // This corresponds to Java's WheelTimerTask
 type WheelTimerTask struct {
        job              *jobtypes.Job
        commonDispatcher MetricsTaskDispatcher
+       statsRecorder    StatsRecorder
        logger           logger.Logger
 }
 
 // NewWheelTimerTask creates a new wheel timer task
-func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher 
MetricsTaskDispatcher, logger logger.Logger) *WheelTimerTask {
+func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher 
MetricsTaskDispatcher, statsRecorder StatsRecorder, logger logger.Logger) 
*WheelTimerTask {
        return &WheelTimerTask{
                job:              job,
                commonDispatcher: commonDispatcher,
+               statsRecorder:    statsRecorder,
                logger:           logger.WithName("wheel-timer-task"),
        }
 }
@@ -53,11 +64,16 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) 
error {
                return fmt.Errorf("job is nil, cannot execute")
        }
 
-       wtt.logger.Info("executing wheel timer task",
+       wtt.logger.V(1).Info("executing task",
                "jobID", wtt.job.ID,
-               "monitorID", wtt.job.MonitorID,
                "app", wtt.job.App)
 
+       // Record job execution start
+       if wtt.statsRecorder != nil {
+               wtt.statsRecorder.RecordJobExecution()
+               wtt.statsRecorder.UpdateJobExecution(wtt.job.ID)
+       }
+
        startTime := time.Now()
 
        // Create a context for this specific task execution
@@ -74,13 +90,23 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) 
error {
                wtt.logger.Error(err, "failed to dispatch metrics task",
                        "jobID", wtt.job.ID,
                        "duration", duration)
+
+               // Record job failure
+               if wtt.statsRecorder != nil {
+                       wtt.statsRecorder.RecordJobFailed()
+               }
                return err
        }
 
-       wtt.logger.Info("successfully dispatched metrics task",
+       wtt.logger.V(1).Info("task completed",
                "jobID", wtt.job.ID,
                "duration", duration)
 
+       // Record job completion
+       if wtt.statsRecorder != nil {
+               wtt.statsRecorder.RecordJobCompleted()
+       }
+
        // If this is a cyclic job, reschedule it for the next execution
        if wtt.job.IsCyclic && wtt.job.DefaultInterval > 0 {
                wtt.rescheduleJob(timeout)
@@ -91,17 +117,20 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) 
error {
 
 // rescheduleJob reschedules a cyclic job for the next execution
 func (wtt *WheelTimerTask) rescheduleJob(timeout *jobtypes.Timeout) {
-       wtt.logger.Info("rescheduling cyclic job",
-               "jobID", wtt.job.ID,
-               "interval", wtt.job.DefaultInterval)
+       if wtt.job.DefaultInterval <= 0 {
+               wtt.logger.Info("job has no valid interval, not rescheduling",
+                       "jobID", wtt.job.ID)
+               return
+       }
 
-       // TODO: Implement rescheduling logic
-       // This would involve creating a new timeout with the same job
-       // and adding it back to the time wheel
-       // For now, we'll log that rescheduling is needed
-       wtt.logger.Info("cyclic job needs rescheduling",
-               "jobID", wtt.job.ID,
-               "nextExecutionIn", 
time.Duration(wtt.job.DefaultInterval)*time.Second)
+       // Use the stats recorder to reschedule the job
+       if wtt.statsRecorder != nil {
+               if err := wtt.statsRecorder.RescheduleJob(wtt.job); err != nil {
+                       wtt.logger.Error(err, "failed to reschedule job", 
"jobID", wtt.job.ID)
+               }
+       } else {
+               wtt.logger.Error(nil, "stats recorder is nil", "jobID", 
wtt.job.ID)
+       }
 }
 
 // GetJob returns the associated job
diff --git a/internal/collector/common/router/message_router.go 
b/internal/collector/common/router/message_router.go
new file mode 100644
index 0000000..262dff7
--- /dev/null
+++ b/internal/collector/common/router/message_router.go
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package router
+
+import (
+       "encoding/json"
+       "fmt"
+
+       pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// MessageType constants matching Java version
+const (
+       MessageTypeHeartbeat                int32 = 0
+       MessageTypeGoOnline                 int32 = 1
+       MessageTypeGoOffline                int32 = 2
+       MessageTypeGoClose                  int32 = 3
+       MessageTypeIssueCyclicTask          int32 = 4
+       MessageTypeDeleteCyclicTask         int32 = 5
+       MessageTypeIssueOneTimeTask         int32 = 6
+       MessageTypeResponseCyclicTaskData   int32 = 7
+       MessageTypeResponseOneTimeTaskData  int32 = 8
+       MessageTypeResponseCyclicTaskSdData int32 = 9
+)
+
+// MessageRouter is the interface for routing messages between transport and 
job components
+type MessageRouter interface {
+       // RegisterProcessors registers all message processors
+       RegisterProcessors()
+
+       // SendResult sends collection results back to the manager
+       SendResult(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job) 
error
+
+       // GetIdentity returns the collector identity
+       GetIdentity() string
+}
+
+// MessageRouterImpl implements the MessageRouter interface
+type MessageRouterImpl struct {
+       logger    logger.Logger
+       client    transport.TransportClient
+       jobRunner *job.Runner
+       identity  string
+}
+
+// Config represents message router configuration
+type Config struct {
+       Logger    logger.Logger
+       Client    transport.TransportClient
+       JobRunner *job.Runner
+       Identity  string
+}
+
+// New creates a new message router
+func New(cfg *Config) MessageRouter {
+       return &MessageRouterImpl{
+               logger:    cfg.Logger.WithName("message-router"),
+               client:    cfg.Client,
+               jobRunner: cfg.JobRunner,
+               identity:  cfg.Identity,
+       }
+}
+
+// RegisterProcessors registers all message processors
+func (r *MessageRouterImpl) RegisterProcessors() {
+       if r.client == nil {
+               r.logger.Error(nil, "cannot register processors: client is nil")
+               return
+       }
+
+       // Register cyclic task processor
+       r.client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg 
interface{}) (interface{}, error) {
+               if pbMsg, ok := msg.(*pb.Message); ok {
+                       return r.handleIssueCyclicTask(pbMsg)
+               }
+               return nil, fmt.Errorf("invalid message type")
+       })
+
+       // Register delete cyclic task processor
+       r.client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg 
interface{}) (interface{}, error) {
+               if pbMsg, ok := msg.(*pb.Message); ok {
+                       return r.handleDeleteCyclicTask(pbMsg)
+               }
+               return nil, fmt.Errorf("invalid message type")
+       })
+
+       // Register one-time task processor
+       r.client.RegisterProcessor(MessageTypeIssueOneTimeTask, func(msg 
interface{}) (interface{}, error) {
+               if pbMsg, ok := msg.(*pb.Message); ok {
+                       return r.handleIssueOneTimeTask(pbMsg)
+               }
+               return nil, fmt.Errorf("invalid message type")
+       })
+
+       // Other processors can be registered here
+       r.logger.Info("Successfully registered all message processors")
+}
+
+// handleIssueCyclicTask handles cyclic task messages
+func (r *MessageRouterImpl) handleIssueCyclicTask(msg *pb.Message) 
(*pb.Message, error) {
+       r.logger.Info("Handling cyclic task message")
+
+       // Parse job from message
+       var job jobtypes.Job
+       if err := json.Unmarshal(msg.Msg, &job); err != nil {
+               r.logger.Error(err, "failed to unmarshal job")
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  r.identity,
+                       Msg:       []byte("failed to unmarshal job"),
+               }, nil
+       }
+
+       // Add job to job runner
+       if err := r.jobRunner.AddAsyncCollectJob(&job); err != nil {
+               r.logger.Error(err, "failed to add job to job runner", "jobID", 
job.ID)
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  r.identity,
+                       Msg:       []byte(fmt.Sprintf("failed to add job: %v", 
err)),
+               }, nil
+       }
+
+       return &pb.Message{
+               Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
+               Direction: pb.Direction_RESPONSE,
+               Identity:  r.identity,
+               Msg:       []byte("job added successfully"),
+       }, nil
+}
+
+// handleDeleteCyclicTask handles delete cyclic task messages
+func (r *MessageRouterImpl) handleDeleteCyclicTask(msg *pb.Message) 
(*pb.Message, error) {
+       r.logger.Info("Handling delete cyclic task message")
+
+       // Parse job IDs from message
+       var jobIDs []int64
+       if err := json.Unmarshal(msg.Msg, &jobIDs); err != nil {
+               r.logger.Error(err, "failed to unmarshal job IDs")
+               return &pb.Message{
+                       Type:      pb.MessageType_DELETE_CYCLIC_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  r.identity,
+                       Msg:       []byte("failed to unmarshal job IDs"),
+               }, nil
+       }
+
+       // Remove jobs from job runner
+       for _, jobID := range jobIDs {
+               if err := r.jobRunner.RemoveAsyncCollectJob(jobID); err != nil {
+                       r.logger.Error(err, "failed to remove job from job 
runner", "jobID", jobID)
+                       // Continue with other jobs even if one fails
+               }
+       }
+
+       return &pb.Message{
+               Type:      pb.MessageType_DELETE_CYCLIC_TASK,
+               Direction: pb.Direction_RESPONSE,
+               Identity:  r.identity,
+               Msg:       []byte("jobs removed successfully"),
+       }, nil
+}
+
+// handleIssueOneTimeTask handles one-time task messages
+func (r *MessageRouterImpl) handleIssueOneTimeTask(msg *pb.Message) 
(*pb.Message, error) {
+       r.logger.Info("Handling one-time task message")
+
+       // Parse job from message
+       var job jobtypes.Job
+       if err := json.Unmarshal(msg.Msg, &job); err != nil {
+               r.logger.Error(err, "failed to unmarshal job")
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  r.identity,
+                       Msg:       []byte("failed to unmarshal job"),
+               }, nil
+       }
+
+       // Set job as non-cyclic
+       job.IsCyclic = false
+
+       // Add job to job runner
+       if err := r.jobRunner.AddAsyncCollectJob(&job); err != nil {
+               r.logger.Error(err, "failed to add one-time job to job runner", 
"jobID", job.ID)
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  r.identity,
+                       Msg:       []byte(fmt.Sprintf("failed to add one-time 
job: %v", err)),
+               }, nil
+       }
+
+       return &pb.Message{
+               Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
+               Direction: pb.Direction_RESPONSE,
+               Identity:  r.identity,
+               Msg:       []byte("one-time job added successfully"),
+       }, nil
+}
+
+// SendResult sends collection results back to the manager
+func (r *MessageRouterImpl) SendResult(data *jobtypes.CollectRepMetricsData, 
job *jobtypes.Job) error {
+       if r.client == nil || !r.client.IsStarted() {
+               return fmt.Errorf("transport client not started")
+       }
+
+       // Determine message type based on job type
+       var msgType pb.MessageType
+       if job.IsCyclic {
+               msgType = pb.MessageType_RESPONSE_CYCLIC_TASK_DATA
+       } else {
+               msgType = pb.MessageType_RESPONSE_ONE_TIME_TASK_DATA
+       }
+
+       // Serialize metrics data
+       dataBytes, err := json.Marshal([]jobtypes.CollectRepMetricsData{*data})
+       if err != nil {
+               return fmt.Errorf("failed to marshal metrics data: %w", err)
+       }
+
+       // Create message
+       msg := &pb.Message{
+               Type:      msgType,
+               Direction: pb.Direction_REQUEST,
+               Identity:  r.identity,
+               Msg:       dataBytes,
+       }
+
+       // Send message
+       if err := r.client.SendMsg(msg); err != nil {
+               return fmt.Errorf("failed to send metrics data: %w", err)
+       }
+
+       r.logger.Info("Successfully sent metrics data",
+               "jobID", job.ID,
+               "metricsName", data.Metrics,
+               "isCyclic", job.IsCyclic)
+
+       return nil
+}
+
+// GetIdentity returns the collector identity
+func (r *MessageRouterImpl) GetIdentity() string {
+       return r.identity
+}
diff --git a/internal/collector/common/transport/transport.go 
b/internal/collector/common/transport/transport.go
index ee1da89..44058e3 100644
--- a/internal/collector/common/transport/transport.go
+++ b/internal/collector/common/transport/transport.go
@@ -45,8 +45,9 @@ const (
 )
 
 type Runner struct {
-       Config *configtypes.CollectorConfig
-       client transport.TransportClient
+       Config       *configtypes.CollectorConfig
+       client       transport.TransportClient
+       jobScheduler transport.JobScheduler
        clrserver.Server
 }
 
@@ -59,6 +60,11 @@ func New(cfg *configtypes.CollectorConfig) *Runner {
        }
 }
 
+// SetJobScheduler sets the job scheduler for the transport runner
+func (r *Runner) SetJobScheduler(scheduler transport.JobScheduler) {
+       r.jobScheduler = scheduler
+}
+
 // NewFromConfig creates a new transport runner from collector configuration
 func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner {
        if cfg == nil {
@@ -150,7 +156,14 @@ func (r *Runner) Start(ctx context.Context) error {
                                r.Logger.Error(event.Error, "Failed to connect 
to manager gRPC server", "addr", event.Address)
                        }
                })
-               transport.RegisterDefaultProcessors(c)
+               // Register processors with job scheduler
+               if r.jobScheduler != nil {
+                       transport.RegisterDefaultProcessors(c, r.jobScheduler)
+                       r.Logger.Info("Registered gRPC processors with job 
scheduler")
+               } else {
+                       transport.RegisterDefaultProcessors(c, nil)
+                       r.Logger.Info("Registered gRPC processors without job 
scheduler")
+               }
        case *transport.NettyClient:
                c.SetEventHandler(func(event transport.Event) {
                        switch event.Type {
@@ -163,7 +176,14 @@ func (r *Runner) Start(ctx context.Context) error {
                                r.Logger.Error(event.Error, "Failed to connect 
to manager netty server", "addr", event.Address)
                        }
                })
-               transport.RegisterDefaultNettyProcessors(c)
+               // Register processors with job scheduler
+               if r.jobScheduler != nil {
+                       transport.RegisterDefaultNettyProcessors(c, 
r.jobScheduler)
+                       r.Logger.Info("Registered netty processors with job 
scheduler")
+               } else {
+                       transport.RegisterDefaultNettyProcessors(c, nil)
+                       r.Logger.Info("Registered netty processors without job 
scheduler")
+               }
        }
 
        if err := r.client.Start(); err != nil {
diff --git a/internal/collector/common/types/job/job_types.go 
b/internal/collector/common/types/job/job_types.go
index 28478b8..70c3c10 100644
--- a/internal/collector/common/types/job/job_types.go
+++ b/internal/collector/common/types/job/job_types.go
@@ -30,9 +30,9 @@ type Job struct {
        Hide                bool              `json:"hide"`
        Category            string            `json:"category"`
        App                 string            `json:"app"`
-       Name                map[string]string `json:"name"`
-       Help                map[string]string `json:"help"`
-       HelpLink            map[string]string `json:"helpLink"`
+       Name                interface{}       `json:"name"`     // Can be 
string or map[string]string for i18n
+       Help                interface{}       `json:"help"`     // Can be 
string or map[string]string for i18n
+       HelpLink            interface{}       `json:"helpLink"` // Can be 
string or map[string]string for i18n
        Timestamp           int64             `json:"timestamp"`
        DefaultInterval     int64             `json:"defaultInterval"`
        Intervals           []int64           `json:"intervals"`
@@ -41,7 +41,10 @@ type Job struct {
        Metrics             []Metrics         `json:"metrics"`
        Configmap           []Configmap       `json:"configmap"`
        IsSd                bool              `json:"isSd"`
+       Sd                  bool              `json:"sd"`
        PrometheusProxyMode bool              `json:"prometheusProxyMode"`
+       Cyclic              bool              `json:"cyclic"`
+       Interval            int64             `json:"interval"`
 
        // Internal fields
        EnvConfigmaps    map[string]Configmap `json:"-"`
diff --git a/internal/collector/common/types/job/metrics_types.go 
b/internal/collector/common/types/job/metrics_types.go
index 6127238..04f035e 100644
--- a/internal/collector/common/types/job/metrics_types.go
+++ b/internal/collector/common/types/job/metrics_types.go
@@ -17,29 +17,37 @@
 
 package job
 
-import "time"
+import (
+       "fmt"
+       "time"
+)
 
 // Metrics represents a metric configuration
 type Metrics struct {
        Name        string            `json:"name"`
+       I18n        interface{}       `json:"i18n,omitempty"` // 
Internationalization info
        Priority    int               `json:"priority"`
+       CollectTime int64             `json:"collectTime"`
+       Interval    int64             `json:"interval"`
+       Visible     *bool             `json:"visible"`
        Fields      []Field           `json:"fields"`
        Aliasfields []string          `json:"aliasfields"`
-       Calculates  []Calculate       `json:"calculates"`
-       Units       []Unit            `json:"units"`
+       AliasFields []string          `json:"aliasFields"` // Alternative field 
name
+       Calculates  interface{}       `json:"calculates"`  // Can be 
[]Calculate or []string
+       Filters     interface{}       `json:"filters"`
+       Units       interface{}       `json:"units"` // Can be []Unit or 
[]string
        Protocol    string            `json:"protocol"`
        Host        string            `json:"host"`
        Port        string            `json:"port"`
        Timeout     string            `json:"timeout"`
-       Interval    int64             `json:"interval"`
        Range       string            `json:"range"`
-       Visible     *bool             `json:"visible"`
        ConfigMap   map[string]string `json:"configMap"`
+       HasSubTask  bool              `json:"hasSubTask"`
 
        // Protocol specific fields
        HTTP    *HTTPProtocol    `json:"http,omitempty"`
        SSH     *SSHProtocol     `json:"ssh,omitempty"`
-       JDBC    *JDBCProtocol    `json:"jdbc,omitempty"`
+       JDBC    interface{}      `json:"jdbc,omitempty"` // Can be JDBCProtocol 
or map[string]interface{}
        SNMP    *SNMPProtocol    `json:"snmp,omitempty"`
        JMX     *JMXProtocol     `json:"jmx,omitempty"`
        Redis   *RedisProtocol   `json:"redis,omitempty"`
@@ -54,6 +62,7 @@ type Field struct {
        Unit     string      `json:"unit"`
        Instance bool        `json:"instance"`
        Value    interface{} `json:"value"`
+       I18n     interface{} `json:"i18n,omitempty"` // Internationalization 
info
 }
 
 // Calculate represents a calculation configuration
@@ -71,8 +80,10 @@ type Unit struct {
 
 // ParamDefine represents a parameter definition
 type ParamDefine struct {
+       ID           interface{} `json:"id"`
+       App          interface{} `json:"app"`
        Field        string      `json:"field"`
-       Name         string      `json:"name"`
+       Name         interface{} `json:"name"` // Can be string or 
map[string]string for i18n
        Type         string      `json:"type"`
        Required     bool        `json:"required"`
        DefaultValue interface{} `json:"defaultValue"`
@@ -80,8 +91,38 @@ type ParamDefine struct {
        Range        string      `json:"range"`
        Limit        int         `json:"limit"`
        Options      []Option    `json:"options"`
-       Depend       *Depend     `json:"depend"`
+       KeyAlias     interface{} `json:"keyAlias"`
+       ValueAlias   interface{} `json:"valueAlias"`
        Hide         bool        `json:"hide"`
+       Creator      interface{} `json:"creator"`
+       Modifier     interface{} `json:"modifier"`
+       GmtCreate    interface{} `json:"gmtCreate"`
+       GmtUpdate    interface{} `json:"gmtUpdate"`
+       Depend       *Depend     `json:"depend"`
+}
+
+// GetName returns the name as string, handling both string and i18n map cases
+func (p *ParamDefine) GetName() string {
+       switch v := p.Name.(type) {
+       case string:
+               return v
+       case map[string]interface{}:
+               // Try to get English name first, then any available language
+               if name, ok := v["en"]; ok {
+                       if s, ok := name.(string); ok {
+                               return s
+                       }
+               }
+               // Fall back to first available name
+               for _, val := range v {
+                       if s, ok := val.(string); ok {
+                               return s
+                       }
+               }
+               return "unknown"
+       default:
+               return fmt.Sprintf("%v", v)
+       }
 }
 
 // Option represents a parameter option
@@ -156,18 +197,18 @@ type SSHProtocol struct {
 
 // JDBCProtocol represents JDBC protocol configuration
 type JDBCProtocol struct {
-       Host            string     `json:"host"`
-       Port            string     `json:"port"`
-       Platform        string     `json:"platform"`
-       Username        string     `json:"username"`
-       Password        string     `json:"password"`
-       Database        string     `json:"database"`
-       Timeout         string     `json:"timeout"`
-       QueryType       string     `json:"queryType"`
-       SQL             string     `json:"sql"`
-       URL             string     `json:"url"`
-       ReuseConnection string     `json:"reuseConnection"`
-       SSHTunnel       *SSHTunnel `json:"sshTunnel,omitempty"`
+       Host            string                 `json:"host"`
+       Port            string                 `json:"port"`
+       Platform        string                 `json:"platform"`
+       Username        string                 `json:"username"`
+       Password        string                 `json:"password"`
+       Database        string                 `json:"database"`
+       Timeout         string                 `json:"timeout"`
+       QueryType       string                 `json:"queryType"`
+       SQL             string                 `json:"sql"`
+       URL             string                 `json:"url"`
+       ReuseConnection string                 `json:"reuseConnection"`
+       SSHTunnel       map[string]interface{} `json:"sshTunnel,omitempty"`
 }
 
 // SNMPProtocol represents SNMP protocol configuration
@@ -270,7 +311,35 @@ func (j *Job) Clone() *Job {
 
        if j.Metrics != nil {
                clone.Metrics = make([]Metrics, len(j.Metrics))
-               copy(clone.Metrics, j.Metrics)
+               for i, metric := range j.Metrics {
+                       clone.Metrics[i] = metric
+
+                       // Deep copy ConfigMap for each metric to avoid 
concurrent access
+                       if metric.ConfigMap != nil {
+                               clone.Metrics[i].ConfigMap = 
make(map[string]string, len(metric.ConfigMap))
+                               for k, v := range metric.ConfigMap {
+                                       clone.Metrics[i].ConfigMap[k] = v
+                               }
+                       }
+
+                       // Deep copy Fields slice
+                       if metric.Fields != nil {
+                               clone.Metrics[i].Fields = make([]Field, 
len(metric.Fields))
+                               copy(clone.Metrics[i].Fields, metric.Fields)
+                       }
+
+                       // Deep copy Aliasfields slice
+                       if metric.Aliasfields != nil {
+                               clone.Metrics[i].Aliasfields = make([]string, 
len(metric.Aliasfields))
+                               copy(clone.Metrics[i].Aliasfields, 
metric.Aliasfields)
+                       }
+
+                       // Deep copy AliasFields slice
+                       if metric.AliasFields != nil {
+                               clone.Metrics[i].AliasFields = make([]string, 
len(metric.AliasFields))
+                               copy(clone.Metrics[i].AliasFields, 
metric.AliasFields)
+                       }
+               }
        }
 
        if j.Configmap != nil {
diff --git a/internal/transport/netty_client.go 
b/internal/transport/netty_client.go
index d598149..ebaeade 100644
--- a/internal/transport/netty_client.go
+++ b/internal/transport/netty_client.go
@@ -503,7 +503,7 @@ func (f *TransportClientFactory) CreateClient(protocol, 
addr string) (TransportC
 }
 
 // RegisterDefaultProcessors registers all default message processors for 
Netty client
-func RegisterDefaultNettyProcessors(client *NettyClient) {
+func RegisterDefaultNettyProcessors(client *NettyClient, scheduler 
JobScheduler) {
        client.RegisterProcessor(MessageTypeHeartbeat, func(msg interface{}) 
(interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
                        processor := &HeartbeatProcessor{}
@@ -538,7 +538,7 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
 
        client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := &CollectCyclicDataProcessor{client: nil}
+                       processor := &CollectCyclicDataProcessor{client: nil, 
scheduler: scheduler}
                        return processor.Process(pbMsg)
                }
                return nil, fmt.Errorf("invalid message type")
@@ -546,7 +546,7 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
 
        client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := &DeleteCyclicTaskProcessor{client: nil}
+                       processor := &DeleteCyclicTaskProcessor{client: nil, 
scheduler: scheduler}
                        return processor.Process(pbMsg)
                }
                return nil, fmt.Errorf("invalid message type")
@@ -554,7 +554,7 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
 
        client.RegisterProcessor(MessageTypeIssueOneTimeTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := &CollectOneTimeDataProcessor{client: nil}
+                       processor := &CollectOneTimeDataProcessor{client: nil, 
scheduler: scheduler}
                        return processor.Process(pbMsg)
                }
                return nil, fmt.Errorf("invalid message type")
diff --git a/internal/transport/processors.go b/internal/transport/processors.go
index 8bd71e3..b9df9a3 100644
--- a/internal/transport/processors.go
+++ b/internal/transport/processors.go
@@ -18,11 +18,18 @@
 package transport
 
 import (
+       "encoding/json"
        "fmt"
        "log"
+       "os"
        "time"
 
        pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
 )
 
 // Message type constants matching Java version
@@ -44,6 +51,23 @@ type MessageProcessor interface {
        Process(msg *pb.Message) (*pb.Message, error)
 }
 
+// JobScheduler defines the interface for job scheduling operations
+// This interface allows transport layer to interact with job scheduling 
without direct dependencies
+type JobScheduler interface {
+       // AddAsyncCollectJob adds a job to async collection scheduling
+       AddAsyncCollectJob(job *jobtypes.Job) error
+
+       // RemoveAsyncCollectJob removes a job from scheduling
+       RemoveAsyncCollectJob(jobID int64) error
+}
+
+// preprocessJobPasswords is a helper function to decrypt passwords in job 
configmap
+// This should be called once when receiving a job to avoid repeated decryption
+func preprocessJobPasswords(job *jobtypes.Job) error {
+       replacer := param.NewReplacer()
+       return replacer.PreprocessJobPasswords(job)
+}
+
 // HeartbeatProcessor handles heartbeat messages
 type HeartbeatProcessor struct{}
 
@@ -63,7 +87,31 @@ func NewGoOnlineProcessor(client *GrpcClient) 
*GoOnlineProcessor {
 }
 
 func (p *GoOnlineProcessor) Process(msg *pb.Message) (*pb.Message, error) {
-       // Handle go online message
+       log := logger.DefaultLogger(os.Stdout, 
loggertype.LogLevelInfo).WithName("go-online-processor")
+
+       // Handle go online message - parse ServerInfo and extract AES secret
+       log.Info("received GO_ONLINE message from manager")
+
+       if len(msg.Msg) == 0 {
+               log.Info("empty message from server, please upgrade server")
+       } else {
+               // Parse ServerInfo from message (matches Java: 
JsonUtil.fromJson(message.getMsg().toStringUtf8(), ServerInfo.class))
+               var serverInfo struct {
+                       AesSecret string `json:"aesSecret"`
+               }
+
+               if err := json.Unmarshal(msg.Msg, &serverInfo); err != nil {
+                       log.Error(err, "failed to parse ServerInfo from 
manager")
+               } else if serverInfo.AesSecret == "" {
+                       log.Info("server response has empty secret, please 
check configuration")
+               } else {
+                       // Set the AES secret key globally (matches Java: 
AesUtil.setDefaultSecretKey(serverInfo.getAesSecret()))
+                       log.Info("received AES secret from manager", 
"keyLength", len(serverInfo.AesSecret))
+                       crypto.SetDefaultSecretKey(serverInfo.AesSecret)
+               }
+       }
+
+       log.Info("online message processed successfully")
        return &pb.Message{
                Type:      pb.MessageType_GO_ONLINE,
                Direction: pb.Direction_RESPONSE,
@@ -128,65 +176,238 @@ func (p *GoCloseProcessor) Process(msg *pb.Message) 
(*pb.Message, error) {
 
 // CollectCyclicDataProcessor handles cyclic task messages
 type CollectCyclicDataProcessor struct {
-       client *GrpcClient
+       client    *GrpcClient
+       scheduler JobScheduler
 }
 
-func NewCollectCyclicDataProcessor(client *GrpcClient) 
*CollectCyclicDataProcessor {
-       return &CollectCyclicDataProcessor{client: client}
+func NewCollectCyclicDataProcessor(client *GrpcClient, scheduler JobScheduler) 
*CollectCyclicDataProcessor {
+       return &CollectCyclicDataProcessor{
+               client:    client,
+               scheduler: scheduler,
+       }
 }
 
 func (p *CollectCyclicDataProcessor) Process(msg *pb.Message) (*pb.Message, 
error) {
-       // Handle cyclic task message
-       // TODO: Implement actual task processing logic
+       log := logger.DefaultLogger(os.Stdout, 
loggertype.LogLevelInfo).WithName("cyclic-task-processor")
+       log.Info("processing cyclic task message", "identity", msg.Identity)
+
+       if p.scheduler == nil {
+               log.Info("no job scheduler available, cannot process cyclic 
task")
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  msg.Identity,
+                       Msg:       []byte("no job scheduler available"),
+               }, nil
+       }
+
+       // Parse job from message
+       log.Info("Received cyclic task JSON: %s", string(msg.Msg))
+
+       var job jobtypes.Job
+       if err := json.Unmarshal(msg.Msg, &job); err != nil {
+               log.Error(err, "Failed to unmarshal job from cyclic task 
message: %v")
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  msg.Identity,
+                       Msg:       []byte(fmt.Sprintf("failed to parse job: 
%v", err)),
+               }, nil
+       }
+
+       // Preprocess passwords once (decrypt encrypted passwords in configmap)
+       // This avoids repeated decryption during each task execution
+       if err := preprocessJobPasswords(&job); err != nil {
+               log.Error(err, "Failed to preprocess job passwords")
+       }
+
+       // Ensure job is marked as cyclic
+       job.IsCyclic = true
+
+       log.Info("Adding cyclic job to scheduler",
+               "jobID", job.ID,
+               "monitorID", job.MonitorID,
+               "app", job.App)
+
+       // Add job to scheduler
+       if err := p.scheduler.AddAsyncCollectJob(&job); err != nil {
+               log.Error(err, "Failed to add cyclic job to scheduler: %v")
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  msg.Identity,
+                       Msg:       []byte(fmt.Sprintf("failed to schedule job: 
%v", err)),
+               }, nil
+       }
+
        return &pb.Message{
                Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
                Direction: pb.Direction_RESPONSE,
                Identity:  msg.Identity,
-               Msg:       []byte("cyclic task ack"),
+               Msg:       []byte("cyclic task scheduled successfully"),
        }, nil
 }
 
 // DeleteCyclicTaskProcessor handles delete cyclic task messages
 type DeleteCyclicTaskProcessor struct {
-       client *GrpcClient
+       client    *GrpcClient
+       scheduler JobScheduler
 }
 
-func NewDeleteCyclicTaskProcessor(client *GrpcClient) 
*DeleteCyclicTaskProcessor {
-       return &DeleteCyclicTaskProcessor{client: client}
+func NewDeleteCyclicTaskProcessor(client *GrpcClient, scheduler JobScheduler) 
*DeleteCyclicTaskProcessor {
+       return &DeleteCyclicTaskProcessor{
+               client:    client,
+               scheduler: scheduler,
+       }
 }
 
 func (p *DeleteCyclicTaskProcessor) Process(msg *pb.Message) (*pb.Message, 
error) {
-       // Handle delete cyclic task message
+       log.Printf("Processing delete cyclic task message, identity: %s", 
msg.Identity)
+
+       if p.scheduler == nil {
+               log.Printf("No job scheduler available, cannot process delete 
cyclic task")
+               return &pb.Message{
+                       Type:      pb.MessageType_DELETE_CYCLIC_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  msg.Identity,
+                       Msg:       []byte("no job scheduler available"),
+               }, nil
+       }
+
+       // Parse job IDs from message - could be a single job ID or array of IDs
+       var jobIDs []int64
+
+       // Try to parse as array first
+       if err := json.Unmarshal(msg.Msg, &jobIDs); err != nil {
+               // If array parsing fails, try single job ID
+               var singleJobID int64
+               if err2 := json.Unmarshal(msg.Msg, &singleJobID); err2 != nil {
+                       log.Printf("Failed to unmarshal job IDs from delete 
message: %v, %v", err, err2)
+                       return &pb.Message{
+                               Type:      pb.MessageType_DELETE_CYCLIC_TASK,
+                               Direction: pb.Direction_RESPONSE,
+                               Identity:  msg.Identity,
+                               Msg:       []byte(fmt.Sprintf("failed to parse 
job IDs: %v", err)),
+                       }, nil
+               }
+               jobIDs = []int64{singleJobID}
+       }
+
+       log.Printf("Removing %d cyclic jobs from scheduler: %v", len(jobIDs), 
jobIDs)
+
+       // Remove jobs from scheduler
+       var errors []string
+       for _, jobID := range jobIDs {
+               if err := p.scheduler.RemoveAsyncCollectJob(jobID); err != nil {
+                       log.Printf("Failed to remove job %d from scheduler: 
%v", jobID, err)
+                       errors = append(errors, fmt.Sprintf("job %d: %v", 
jobID, err))
+               } else {
+                       log.Printf("Successfully removed job %d from 
scheduler", jobID)
+               }
+       }
+
+       // Prepare response message
+       var responseMsg string
+       if len(errors) > 0 {
+               responseMsg = fmt.Sprintf("partially completed, errors: %v", 
errors)
+       } else {
+               responseMsg = "all cyclic tasks removed successfully"
+       }
+
        return &pb.Message{
                Type:      pb.MessageType_DELETE_CYCLIC_TASK,
                Direction: pb.Direction_RESPONSE,
                Identity:  msg.Identity,
-               Msg:       []byte("delete cyclic task ack"),
+               Msg:       []byte(responseMsg),
        }, nil
 }
 
 // CollectOneTimeDataProcessor handles one-time task messages
 type CollectOneTimeDataProcessor struct {
-       client *GrpcClient
+       client    *GrpcClient
+       scheduler JobScheduler
 }
 
-func NewCollectOneTimeDataProcessor(client *GrpcClient) 
*CollectOneTimeDataProcessor {
-       return &CollectOneTimeDataProcessor{client: client}
+func NewCollectOneTimeDataProcessor(client *GrpcClient, scheduler 
JobScheduler) *CollectOneTimeDataProcessor {
+       return &CollectOneTimeDataProcessor{
+               client:    client,
+               scheduler: scheduler,
+       }
 }
 
 func (p *CollectOneTimeDataProcessor) Process(msg *pb.Message) (*pb.Message, 
error) {
-       // Handle one-time task message
-       // TODO: Implement actual task processing logic
+       log.Printf("Processing one-time task message, identity: %s", 
msg.Identity)
+
+       if p.scheduler == nil {
+               log.Printf("No job scheduler available, cannot process one-time 
task")
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  msg.Identity,
+                       Msg:       []byte("no job scheduler available"),
+               }, nil
+       }
+
+       // Parse job from message
+       log.Printf("Received one-time task JSON: %s", string(msg.Msg))
+
+       // Parse JSON to check interval values before unmarshaling
+       var rawJob map[string]interface{}
+       if err := json.Unmarshal(msg.Msg, &rawJob); err == nil {
+               if defaultInterval, ok := rawJob["defaultInterval"]; ok {
+                       log.Printf("DEBUG: Raw defaultInterval from JSON: %v 
(type: %T)", defaultInterval, defaultInterval)
+               }
+               if interval, ok := rawJob["interval"]; ok {
+                       log.Printf("DEBUG: Raw interval from JSON: %v (type: 
%T)", interval, interval)
+               }
+       }
+
+       var job jobtypes.Job
+       if err := json.Unmarshal(msg.Msg, &job); err != nil {
+               log.Printf("Failed to unmarshal job from one-time task message: 
%v", err)
+               log.Printf("JSON content: %s", string(msg.Msg))
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  msg.Identity,
+                       Msg:       []byte(fmt.Sprintf("failed to parse job: 
%v", err)),
+               }, nil
+       }
+
+       // Preprocess passwords once (decrypt encrypted passwords in configmap)
+       // This avoids repeated decryption during each task execution
+       if err := preprocessJobPasswords(&job); err != nil {
+               log.Printf("Failed to preprocess job passwords: %v", err)
+       }
+
+       // Ensure job is marked as non-cyclic
+       job.IsCyclic = false
+
+       log.Printf("Adding one-time job to scheduler: jobID=%d, monitorID=%d, 
app=%s",
+               job.ID, job.MonitorID, job.App)
+
+       // Add job to scheduler
+       if err := p.scheduler.AddAsyncCollectJob(&job); err != nil {
+               log.Printf("Failed to add one-time job to scheduler: %v", err)
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  msg.Identity,
+                       Msg:       []byte(fmt.Sprintf("failed to schedule job: 
%v", err)),
+               }, nil
+       }
+
+       log.Printf("Successfully scheduled one-time job: jobID=%d", job.ID)
        return &pb.Message{
                Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
                Direction: pb.Direction_RESPONSE,
                Identity:  msg.Identity,
-               Msg:       []byte("one-time task ack"),
+               Msg:       []byte("one-time task scheduled successfully"),
        }, nil
 }
 
 // RegisterDefaultProcessors registers all default message processors
-func RegisterDefaultProcessors(client *GrpcClient) {
+func RegisterDefaultProcessors(client *GrpcClient, scheduler JobScheduler) {
        client.RegisterProcessor(MessageTypeHeartbeat, func(msg interface{}) 
(interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
                        processor := &HeartbeatProcessor{}
@@ -221,7 +442,7 @@ func RegisterDefaultProcessors(client *GrpcClient) {
 
        client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := NewCollectCyclicDataProcessor(client)
+                       processor := NewCollectCyclicDataProcessor(client, 
scheduler)
                        return processor.Process(pbMsg)
                }
                return nil, fmt.Errorf("invalid message type")
@@ -229,7 +450,7 @@ func RegisterDefaultProcessors(client *GrpcClient) {
 
        client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := NewDeleteCyclicTaskProcessor(client)
+                       processor := NewDeleteCyclicTaskProcessor(client, 
scheduler)
                        return processor.Process(pbMsg)
                }
                return nil, fmt.Errorf("invalid message type")
@@ -237,7 +458,7 @@ func RegisterDefaultProcessors(client *GrpcClient) {
 
        client.RegisterProcessor(MessageTypeIssueOneTimeTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := NewCollectOneTimeDataProcessor(client)
+                       processor := NewCollectOneTimeDataProcessor(client, 
scheduler)
                        return processor.Process(pbMsg)
                }
                return nil, fmt.Errorf("invalid message type")
diff --git a/internal/util/crypto/aes_util.go b/internal/util/crypto/aes_util.go
new file mode 100644
index 0000000..678760f
--- /dev/null
+++ b/internal/util/crypto/aes_util.go
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package crypto
+
+import (
+       "crypto/aes"
+       "crypto/cipher"
+       "encoding/base64"
+       "fmt"
+       "os"
+       "sync"
+       "unicode"
+
+       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// AESUtil provides AES encryption/decryption utilities
+// This matches Java's AesUtil functionality
+type AESUtil struct {
+       secretKey string
+       mutex     sync.RWMutex
+}
+
+var (
+       defaultAESUtil = &AESUtil{}
+       once           sync.Once
+       log            = logger.DefaultLogger(os.Stdout, 
loggertype.LogLevelInfo).WithName("aes-util")
+)
+
+// GetDefaultAESUtil returns the singleton AES utility instance
+func GetDefaultAESUtil() *AESUtil {
+       once.Do(func() {
+               defaultAESUtil = &AESUtil{}
+       })
+       return defaultAESUtil
+}
+
+// SetDefaultSecretKey sets the default AES secret key (matches Java: 
AesUtil.setDefaultSecretKey)
+func SetDefaultSecretKey(secretKey string) {
+       GetDefaultAESUtil().SetSecretKey(secretKey)
+}
+
+// GetDefaultSecretKey gets the current AES secret key (matches Java: 
AesUtil.getDefaultSecretKey)
+func GetDefaultSecretKey() string {
+       return GetDefaultAESUtil().GetSecretKey()
+}
+
+// SetSecretKey sets the AES secret key for this instance
+func (a *AESUtil) SetSecretKey(secretKey string) {
+       a.mutex.Lock()
+       defer a.mutex.Unlock()
+       a.secretKey = secretKey
+       log.Info("AES secret key updated", "keyLength", len(secretKey))
+}
+
+// GetSecretKey gets the AES secret key for this instance
+func (a *AESUtil) GetSecretKey() string {
+       a.mutex.RLock()
+       defer a.mutex.RUnlock()
+       return a.secretKey
+}
+
+// AesDecode decrypts AES encrypted data (matches Java: AesUtil.aesDecode)
+func (a *AESUtil) AesDecode(encryptedData string) (string, error) {
+       secretKey := a.GetSecretKey()
+       if secretKey == "" {
+               return "", fmt.Errorf("AES secret key not set")
+       }
+       return a.AesDecodeWithKey(encryptedData, secretKey)
+}
+
+// AesDecodeWithKey decrypts AES encrypted data with specified key
+func (a *AESUtil) AesDecodeWithKey(encryptedData, key string) (string, error) {
+       // Ensure key is exactly 16 bytes for AES-128 (Java requirement)
+       keyBytes := []byte(key)
+       if len(keyBytes) != 16 {
+               return "", fmt.Errorf("key must be exactly 16 bytes, got %d", 
len(keyBytes))
+       }
+
+       // Decode base64 encoded data
+       encryptedBytes, err := base64.StdEncoding.DecodeString(encryptedData)
+       if err != nil {
+               return "", fmt.Errorf("failed to decode base64: %w", err)
+       }
+
+       // Create AES cipher
+       block, err := aes.NewCipher(keyBytes)
+       if err != nil {
+               return "", fmt.Errorf("failed to create AES cipher: %w", err)
+       }
+
+       // Check if data length is valid for AES
+       if len(encryptedBytes) < aes.BlockSize || 
len(encryptedBytes)%aes.BlockSize != 0 {
+               return "", fmt.Errorf("invalid encrypted data length: %d", 
len(encryptedBytes))
+       }
+
+       // Java uses the key itself as IV (this matches Java's implementation)
+       iv := keyBytes
+
+       // Create CBC decrypter
+       mode := cipher.NewCBCDecrypter(block, iv)
+
+       // Decrypt
+       decrypted := make([]byte, len(encryptedBytes))
+       mode.CryptBlocks(decrypted, encryptedBytes)
+
+       // Remove PKCS5/PKCS7 padding (Go's PKCS7 is compatible with Java's 
PKCS5 for AES)
+       decrypted, err = removePKCS7Padding(decrypted)
+       if err != nil {
+               return "", fmt.Errorf("failed to remove padding: %w", err)
+       }
+
+       return string(decrypted), nil
+}
+
+// IsCiphertext determines whether text is encrypted (matches Java: 
AesUtil.isCiphertext)
+func (a *AESUtil) IsCiphertext(text string) bool {
+       // First check if it's valid base64
+       if _, err := base64.StdEncoding.DecodeString(text); err != nil {
+               return false
+       }
+
+       // Try to decrypt and see if it succeeds
+       _, err := a.AesDecode(text)
+       return err == nil
+}
+
+// removePKCS7Padding removes PKCS7 padding from decrypted data
+func removePKCS7Padding(data []byte) ([]byte, error) {
+       if len(data) == 0 {
+               return data, nil
+       }
+
+       paddingLen := int(data[len(data)-1])
+       if paddingLen > len(data) || paddingLen == 0 {
+               return data, nil // Return as-is if padding seems invalid
+       }
+
+       // Verify padding
+       for i := 0; i < paddingLen; i++ {
+               if data[len(data)-1-i] != byte(paddingLen) {
+                       return data, nil // Return as-is if padding is invalid
+               }
+       }
+
+       return data[:len(data)-paddingLen], nil
+}
+
+// isPrintableString checks if a string contains only printable characters
+func isPrintableString(s string) bool {
+       if len(s) == 0 {
+               return false
+       }
+
+       for _, r := range s {
+               if !unicode.IsPrint(r) && !unicode.IsSpace(r) {
+                       return false
+               }
+       }
+       return true
+}
+
+// Convenience functions that match Java's static methods
+
+// AesDecode decrypts with the default secret key
+func AesDecode(encryptedData string) (string, error) {
+       return GetDefaultAESUtil().AesDecode(encryptedData)
+}
+
+// AesDecodeWithKey decrypts with specified key
+func AesDecodeWithKey(encryptedData, key string) (string, error) {
+       return GetDefaultAESUtil().AesDecodeWithKey(encryptedData, key)
+}
+
+// IsCiphertext checks if text is encrypted using default key
+func IsCiphertext(text string) bool {
+       return GetDefaultAESUtil().IsCiphertext(text)
+}
diff --git a/internal/util/param/param_replacer.go 
b/internal/util/param/param_replacer.go
new file mode 100644
index 0000000..ff6a944
--- /dev/null
+++ b/internal/util/param/param_replacer.go
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package param
+
+import (
+       "encoding/json"
+       "fmt"
+       "os"
+       "strconv"
+       "strings"
+
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// Replacer is a standalone utility for parameter replacement
+// This matches Java's parameter substitution mechanism where ^_^paramName^_^ 
gets replaced with actual values
+type Replacer struct{}
+
+// NewReplacer creates a new parameter replacer
+func NewReplacer() *Replacer {
+       return &Replacer{}
+}
+
+// PreprocessJobPasswords decrypts passwords in job configmap once during job 
creation
+// This permanently replaces encrypted passwords with decrypted ones in the 
job configmap
+func (r *Replacer) PreprocessJobPasswords(job *jobtypes.Job) error {
+       if job == nil || job.Configmap == nil {
+               return nil
+       }
+
+       log := logger.DefaultLogger(os.Stdout, 
loggertype.LogLevelDebug).WithName("password-preprocessor")
+
+       // Decrypt passwords in configmap once and permanently replace them
+       for i := range job.Configmap {
+               config := &job.Configmap[i] // Get pointer to modify in place
+               if config.Type == 2 {       // password type
+                       if encryptedValue, ok := config.Value.(string); ok {
+                               log.Sugar().Debugf("preprocessing encrypted 
password for key: %s", config.Key)
+                               if decoded, err := 
r.decryptPassword(encryptedValue); err == nil {
+                                       log.Info("password preprocessing 
successful", "key", config.Key, "length", len(decoded))
+                                       config.Value = decoded // Permanently 
replace encrypted value with decrypted value
+                                       config.Type = 1        // Change type 
to string since it's now decrypted
+                               } else {
+                                       log.Error(err, "password preprocessing 
failed, keeping original value", "key", config.Key)
+                               }
+                       }
+               }
+       }
+       return nil
+}
+
+// ReplaceJobParams replaces all parameter placeholders in job configuration
+func (r *Replacer) ReplaceJobParams(job *jobtypes.Job) (*jobtypes.Job, error) {
+       if job == nil {
+               return nil, fmt.Errorf("job is nil")
+       }
+
+       // Create parameter map from configmap (passwords should already be 
decrypted)
+       paramMap := r.createParamMapSimple(job.Configmap)
+
+       // Clone the job to avoid modifying original
+       clonedJob := job.Clone()
+       if clonedJob == nil {
+               return nil, fmt.Errorf("failed to clone job")
+       }
+
+       // Replace parameters in all metrics
+       for i := range clonedJob.Metrics {
+               if err := r.replaceMetricsParams(&clonedJob.Metrics[i], 
paramMap); err != nil {
+                       return nil, fmt.Errorf("failed to replace params in 
metrics %s: %w", clonedJob.Metrics[i].Name, err)
+               }
+       }
+
+       return clonedJob, nil
+}
+
+// createParamMapSimple creates a parameter map from configmap entries
+// Assumes passwords have already been decrypted by PreprocessJobPasswords
+func (r *Replacer) createParamMapSimple(configmap []jobtypes.Configmap) 
map[string]string {
+       paramMap := make(map[string]string)
+
+       for _, config := range configmap {
+               // Convert value to string, handle null values as empty strings
+               var strValue string
+               if config.Value == nil {
+                       strValue = "" // null values become empty strings
+               } else {
+                       switch config.Type {
+                       case 0: // number
+                               if numVal, ok := config.Value.(float64); ok {
+                                       strValue = strconv.FormatFloat(numVal, 
'f', -1, 64)
+                               } else if intVal, ok := config.Value.(int); ok {
+                                       strValue = strconv.Itoa(intVal)
+                               } else {
+                                       strValue = fmt.Sprintf("%v", 
config.Value)
+                               }
+                       case 1, 2: // string or password (both are now plain 
strings after preprocessing)
+                               strValue = fmt.Sprintf("%v", config.Value)
+                       default:
+                               strValue = fmt.Sprintf("%v", config.Value)
+                       }
+               }
+               paramMap[config.Key] = strValue
+       }
+
+       return paramMap
+}
+
+// createParamMap creates a parameter map from configmap entries (legacy 
version with decryption)
+// Deprecated: Use PreprocessJobPasswords + createParamMapSafe instead
+func (r *Replacer) createParamMap(configmap []jobtypes.Configmap) 
map[string]string {
+       paramMap := make(map[string]string)
+
+       for _, config := range configmap {
+               // Convert value to string based on type, handle null values as 
empty strings
+               var strValue string
+               if config.Value == nil {
+                       strValue = "" // null values become empty strings
+               } else {
+                       switch config.Type {
+                       case 0: // number
+                               if numVal, ok := config.Value.(float64); ok {
+                                       strValue = strconv.FormatFloat(numVal, 
'f', -1, 64)
+                               } else if intVal, ok := config.Value.(int); ok {
+                                       strValue = strconv.Itoa(intVal)
+                               } else {
+                                       strValue = fmt.Sprintf("%v", 
config.Value)
+                               }
+                       case 1: // string
+                               strValue = fmt.Sprintf("%v", config.Value)
+                       case 2: // password (encrypted)
+                               if encryptedValue, ok := config.Value.(string); 
ok {
+                                       log := logger.DefaultLogger(os.Stdout, 
loggertype.LogLevelDebug).WithName("param-replacer")
+                                       log.Sugar().Debugf("attempting to 
decrypt password: %s", encryptedValue)
+                                       if decoded, err := 
r.decryptPassword(encryptedValue); err == nil {
+                                               log.Info("password decryption 
successful", "length", len(decoded))
+                                               strValue = decoded
+                                       } else {
+                                               log.Error(err, "password 
decryption failed, using original value")
+                                               // Fallback to original value 
if decryption fails
+                                               strValue = encryptedValue
+                                       }
+                               } else {
+                                       strValue = fmt.Sprintf("%v", 
config.Value)
+                               }
+                       default:
+                               strValue = fmt.Sprintf("%v", config.Value)
+                       }
+               }
+               paramMap[config.Key] = strValue
+       }
+
+       return paramMap
+}
+
+// replaceMetricsParams replaces parameters in metrics configuration
+func (r *Replacer) replaceMetricsParams(metrics *jobtypes.Metrics, paramMap 
map[string]string) error {
+       // Replace parameters in protocol-specific configurations
+       // Currently only JDBC is defined as interface{} in the struct 
definition
+       // Other protocols are concrete struct pointers and don't contain 
placeholders from JSON
+       if metrics.JDBC != nil {
+               if err := r.replaceProtocolParams(&metrics.JDBC, paramMap); err 
!= nil {
+                       return fmt.Errorf("failed to replace JDBC params: %w", 
err)
+               }
+       }
+
+       // TODO: Add other protocol configurations as they are converted to 
interface{} types
+       // For now, other protocols (HTTP, SSH, etc.) are concrete struct 
pointers and
+       // don't require parameter replacement
+
+       // Replace parameters in basic metrics fields
+       if err := r.replaceBasicMetricsParams(metrics, paramMap); err != nil {
+               return fmt.Errorf("failed to replace basic metrics params: %w", 
err)
+       }
+
+       return nil
+}
+
+// replaceProtocolParams replaces parameters in any protocol configuration
+func (r *Replacer) replaceProtocolParams(protocolInterface *interface{}, 
paramMap map[string]string) error {
+       if *protocolInterface == nil {
+               return nil
+       }
+
+       // Convert protocol interface{} to map for manipulation
+       protocolMap, ok := (*protocolInterface).(map[string]interface{})
+       if !ok {
+               // If it's already processed or not a map, skip
+               return nil
+       }
+
+       // Recursively replace parameters in all string values
+       return r.replaceParamsInMap(protocolMap, paramMap)
+}
+
+// replaceParamsInMap recursively replaces parameters in a map structure
+func (r *Replacer) replaceParamsInMap(data map[string]interface{}, paramMap 
map[string]string) error {
+       for key, value := range data {
+               switch v := value.(type) {
+               case string:
+                       // Replace parameters in string values
+                       data[key] = r.replaceParamPlaceholders(v, paramMap)
+               case map[string]interface{}:
+                       // Recursively handle nested maps
+                       if err := r.replaceParamsInMap(v, paramMap); err != nil 
{
+                               return fmt.Errorf("failed to replace params in 
nested map %s: %w", key, err)
+                       }
+               case []interface{}:
+                       // Handle arrays
+                       for i, item := range v {
+                               if itemMap, ok := 
item.(map[string]interface{}); ok {
+                                       if err := r.replaceParamsInMap(itemMap, 
paramMap); err != nil {
+                                               return fmt.Errorf("failed to 
replace params in array item %d: %w", i, err)
+                                       }
+                               } else if itemStr, ok := item.(string); ok {
+                                       v[i] = 
r.replaceParamPlaceholders(itemStr, paramMap)
+                               }
+                       }
+               }
+       }
+       return nil
+}
+
+// replaceBasicMetricsParams replaces parameters in basic metrics fields
+func (r *Replacer) replaceBasicMetricsParams(metrics *jobtypes.Metrics, 
paramMap map[string]string) error {
+       // Replace parameters in basic string fields
+       metrics.Host = r.replaceParamPlaceholders(metrics.Host, paramMap)
+       metrics.Port = r.replaceParamPlaceholders(metrics.Port, paramMap)
+       metrics.Timeout = r.replaceParamPlaceholders(metrics.Timeout, paramMap)
+       metrics.Range = r.replaceParamPlaceholders(metrics.Range, paramMap)
+
+       // Replace parameters in ConfigMap
+       if metrics.ConfigMap != nil {
+               for key, value := range metrics.ConfigMap {
+                       metrics.ConfigMap[key] = 
r.replaceParamPlaceholders(value, paramMap)
+               }
+       }
+
+       return nil
+}
+
+// replaceParamPlaceholders replaces ^_^paramName^_^ placeholders with actual 
values
+func (r *Replacer) replaceParamPlaceholders(template string, paramMap 
map[string]string) string {
+       // Guard against empty templates to prevent strings.ReplaceAll issues
+       if template == "" {
+               return ""
+       }
+
+       result := template
+
+       // Find all ^_^paramName^_^ patterns and replace them
+       for paramName, paramValue := range paramMap {
+               // Guard against empty parameter names
+               if paramName == "" {
+                       continue
+               }
+               placeholder := fmt.Sprintf("^_^%s^_^", paramName)
+               result = strings.ReplaceAll(result, placeholder, paramValue)
+       }
+
+       return result
+}
+
+// ExtractProtocolConfig extracts and processes protocol configuration after 
parameter replacement
+func (r *Replacer) ExtractProtocolConfig(protocolInterface interface{}, 
targetStruct interface{}) error {
+       if protocolInterface == nil {
+               return fmt.Errorf("protocol interface is nil")
+       }
+
+       // If it's a map (from JSON parsing), convert to target struct
+       if protocolMap, ok := protocolInterface.(map[string]interface{}); ok {
+               // Convert map to JSON and then unmarshal to target struct
+               jsonData, err := json.Marshal(protocolMap)
+               if err != nil {
+                       return fmt.Errorf("failed to marshal protocol config: 
%w", err)
+               }
+
+               if err := json.Unmarshal(jsonData, targetStruct); err != nil {
+                       return fmt.Errorf("failed to unmarshal protocol config: 
%w", err)
+               }
+
+               return nil
+       }
+
+       return fmt.Errorf("unsupported protocol config type: %T", 
protocolInterface)
+}
+
+// ExtractJDBCConfig extracts and processes JDBC configuration after parameter 
replacement
+func (r *Replacer) ExtractJDBCConfig(jdbcInterface interface{}) 
(*jobtypes.JDBCProtocol, error) {
+       if jdbcInterface == nil {
+               return nil, nil
+       }
+
+       // If it's already a JDBCProtocol struct
+       if jdbcConfig, ok := jdbcInterface.(*jobtypes.JDBCProtocol); ok {
+               return jdbcConfig, nil
+       }
+
+       // Use the generic extraction method
+       var jdbcConfig jobtypes.JDBCProtocol
+       if err := r.ExtractProtocolConfig(jdbcInterface, &jdbcConfig); err != 
nil {
+               return nil, fmt.Errorf("failed to extract JDBC config: %w", err)
+       }
+
+       return &jdbcConfig, nil
+}
+
+// ExtractHTTPConfig extracts and processes HTTP configuration after parameter 
replacement
+func (r *Replacer) ExtractHTTPConfig(httpInterface interface{}) 
(*jobtypes.HTTPProtocol, error) {
+       if httpInterface == nil {
+               return nil, nil
+       }
+
+       // If it's already an HTTPProtocol struct
+       if httpConfig, ok := httpInterface.(*jobtypes.HTTPProtocol); ok {
+               return httpConfig, nil
+       }
+
+       // Use the generic extraction method
+       var httpConfig jobtypes.HTTPProtocol
+       if err := r.ExtractProtocolConfig(httpInterface, &httpConfig); err != 
nil {
+               return nil, fmt.Errorf("failed to extract HTTP config: %w", err)
+       }
+
+       return &httpConfig, nil
+}
+
+// ExtractSSHConfig extracts and processes SSH configuration after parameter 
replacement
+func (r *Replacer) ExtractSSHConfig(sshInterface interface{}) 
(*jobtypes.SSHProtocol, error) {
+       if sshInterface == nil {
+               return nil, nil
+       }
+
+       // If it's already an SSHProtocol struct
+       if sshConfig, ok := sshInterface.(*jobtypes.SSHProtocol); ok {
+               return sshConfig, nil
+       }
+
+       // Use the generic extraction method
+       var sshConfig jobtypes.SSHProtocol
+       if err := r.ExtractProtocolConfig(sshInterface, &sshConfig); err != nil 
{
+               return nil, fmt.Errorf("failed to extract SSH config: %w", err)
+       }
+
+       return &sshConfig, nil
+}
+
+// decryptPassword decrypts an encrypted password using AES
+// This implements the same algorithm as Java manager's AESUtil
+func (r *Replacer) decryptPassword(encryptedPassword string) (string, error) {
+       log := logger.DefaultLogger(os.Stdout, 
loggertype.LogLevelDebug).WithName("password-decrypt")
+
+       // Use the AES utility (matches Java: AesUtil.aesDecode)
+       if result, err := crypto.AesDecode(encryptedPassword); err == nil {
+               log.Info("password decrypted successfully", "length", 
len(result))
+               return result, nil
+       } else {
+               log.Sugar().Debugf("primary decryption failed: %v", err)
+       }
+
+       // Fallback: try with default Java key if no manager key is set
+       defaultKey := "tomSun28HaHaHaHa"
+       if result, err := crypto.AesDecodeWithKey(encryptedPassword, 
defaultKey); err == nil {
+               log.Info("password decrypted with default key", "length", 
len(result))
+               return result, nil
+       }
+
+       // If all decryption attempts fail, return original value to allow 
system to continue
+       log.Info("all decryption attempts failed, using original value")
+       return encryptedPassword, nil
+}
diff --git a/tools/docker/hcg/Dockerfile b/tools/docker/hcg/Dockerfile
deleted file mode 100644
index 2bb553d..0000000
--- a/tools/docker/hcg/Dockerfile
+++ /dev/null
@@ -1,35 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-FROM 
docker.io/library/busybox@sha256:ab33eacc8251e3807b85bb6dba570e4698c3998eca6f0fc2ccb60575a563ea74
 AS builder
-
-# prepare hertzbeat data dir
-RUN mkdir -p /var/hertzbeat
-
-# Use distroless as minimal base image to package the manager binary
-# Refer to https://github.com/GoogleContainerTools/distroless for more details
-FROM 
gcr.io/distroless/base-nossl:nonroot@sha256:8981b63f968e829d21351ea9d28cc21127e5f034707f1d8483d2993d9577be0b
-
-COPY --from=builder /var/hertzbeat/ /var/hertzbeat/
-
-# copy binary to image, run make build to generate binary.
-COPY bin /usr/local/bin/
-COPY etc /var/hertzbeat/config/
-
-USER 65532:65532
-
-ENTRYPOINT ["/usr/local/bin/collector", "server", "--config", 
"/var/hertzbeat/config/hertzbeat-collector.yaml"]
diff --git a/tools/make/golang.mk b/tools/make/golang.mk
index a67ec75..0fe356f 100644
--- a/tools/make/golang.mk
+++ b/tools/make/golang.mk
@@ -18,46 +18,37 @@
 VERSION_PACKAGE := 
hertzbeat.apache.org/hertzbeat-collector-go/internal/cmd/version
 
 GO_LDFLAGS += -X $(VERSION_PACKAGE).hcgVersion=$(shell cat VERSION) \
--X $(VERSION_PACKAGE).gitCommitID=$(GIT_COMMIT)
+       -X $(VERSION_PACKAGE).gitCommitID=$(GIT_COMMIT)
 
 GIT_COMMIT:=$(shell git rev-parse HEAD)
 
-GOOS   ?= $(shell go env GOOS)
-GOARCH ?= $(shell go env GOARCH)
-
 ##@ Golang
 
 .PHONY: fmt
 fmt: ## Golang fmt
-       @$(LOG_TARGET)
        go fmt ./...
 
 .PHONY: vet
 vet: ## Golang vet
-       @$(LOG_TARGET)
        go vet ./...
 
 .PHONY: dev
 dev: ## Golang dev, run main by run.
-       @$(LOG_TARGET)
        go run cmd/main.go server --config etc/hertzbeat-collector.yaml
 
 .PHONY: prod
 prod: ## Golang prod, run bin by run.
-       @$(LOG_TARGET)
        bin/collector server --config etc/hertzbeat-collector.yaml
 
 .PHONY: build
-build: GOOS = linux
+# build
 build: ## Golang build
-       @$(LOG_TARGET)
        @version=$$(cat VERSION); \
        # todo; 添加交叉编译支持
-       CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) go build -o bin/collector 
-ldflags "$(GO_LDFLAGS)" cmd/main.go
+       CGO_ENABLED=0 go build -o bin/collector -ldflags "$(GO_LDFLAGS)" 
cmd/main.go
 
 .PHONY: init
 init: ## install base. For proto compile.
-       @$(LOG_TARGET)
        go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
        go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
        go install github.com/go-kratos/kratos/cmd/protoc-gen-go-http/v2@latest
@@ -67,7 +58,6 @@ init: ## install base. For proto compile.
 # generate api proto
 api: API_PROTO_FILES := $(wildcard api/*.proto)
 api: ## compile api proto files
-       @$(LOG_TARGET)
        protoc --proto_path=./api \
               --go_out=paths=source_relative:./api \
               --go-http_out=paths=source_relative:./api \
@@ -76,10 +66,12 @@ api: ## compile api proto files
 
 .PHONY: go-lint
 go-lint: ## run golang lint
-       @$(LOG_TARGET)
        golangci-lint run --config tools/linter/golang-ci/.golangci.yml
 
 .PHONY: test
 test: ## run golang test
-       @$(LOG_TARGET)
        go test -v ./...
+
+.PHONY: golang-all
+golang-all: ## run fmt lint vet build api test
+golang-all: fmt go-lint vet build api test
diff --git a/tools/make/image.mk b/tools/make/image.mk
deleted file mode 100644
index fc37001..0000000
--- a/tools/make/image.mk
+++ /dev/null
@@ -1,58 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# This is a wrapper to build and push docker image
-#
-
-# All make targets related to docker image are defined in this file.
-
-REGISTRY ?= docker.io
-
-TAG ?= $(shell git rev-parse HEAD)
-
-DOCKER := docker
-DOCKER_SUPPORTED_API_VERSION ?= 1.32
-
-IMAGES_DIR ?= $(wildcard tools/docker/hcg)
-
-IMAGES ?= hertzbeat-collector-go
-IMAGE_PLATFORMS ?= amd64 arm64
-
-BUILDX_CONTEXT = hcg-build-tools-builder
-
-##@ Image
-
-# todo: multi-platform build
-
-.PHONY: image-build
-image-build: ## Build docker image
-image-build: IMAGE_PLATFORMS = ${shell uname -m}
-image-build:
-       @$(LOG_TARGET)
-       make build
-       $(DOCKER) buildx create --name $(BUILDX_CONTEXT) --use; \
-       $(DOCKER) buildx use $(BUILDX_CONTEXT); \
-       $(DOCKER) buildx build --load \
-        -t $(REGISTRY)/${IMAGES}:$(TAG) \
-        --platform linux/${IMAGE_PLATFORMS} \
-        --file $(IMAGES_DIR)/Dockerfile . ; \
-        $(DOCKER) buildx rm $(BUILDX_CONTEXT)
-
-.PHONY: image-push
-image-push: ## Push docker image
-image-push:
-       @$(LOG_TARGET)
-       $(DOCKER) push $(REGISTRY)/$${image}:$(TAG)-$${platform}; \


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to