This is an automated email from the ASF dual-hosted git repository.

shown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 25ecbd2  feat: implement SSH collector with proxy support and optimize 
job scheduling (#25)
25ecbd2 is described below

commit 25ecbd23e04ead5feaef840385372341f3cfbebe
Author: Yang Yexuan <[email protected]>
AuthorDate: Fri Oct 17 22:22:33 2025 +0800

    feat: implement SSH collector with proxy support and optimize job 
scheduling (#25)
---
 go.mod                                             |   2 +-
 go.sum                                             |   2 +
 .../collector/basic/database/jdbc_collector.go     |  15 +-
 internal/collector/basic/init.go                   |   5 +
 internal/collector/basic/ssh/.keep                 |   0
 internal/collector/basic/ssh/ssh_collector.go      | 390 +++++++++++++++++++++
 .../common/collect/dispatch/metrics_collector.go   |   6 +-
 .../common/collect/lazy_message_router.go          |  44 +--
 .../collector/common/collect/result_handler.go     |   4 +-
 .../common/dispatcher/common_dispatcher.go         |   1 +
 internal/collector/common/ssh/.keep                |   0
 internal/collector/common/ssh/ssh_helper.go        | 224 ++++++++++++
 internal/collector/common/types/job/job_types.go   |  94 ++++-
 .../collector/common/types/job/metrics_types.go    |  38 +-
 internal/constants/const.go                        |  10 +
 internal/util/param/param_replacer.go              |   6 +
 16 files changed, 768 insertions(+), 73 deletions(-)

diff --git a/go.mod b/go.mod
index 88dc66f..6115950 100644
--- a/go.mod
+++ b/go.mod
@@ -13,6 +13,7 @@ require (
        github.com/spf13/cobra v1.10.1
        github.com/stretchr/testify v1.10.0
        go.uber.org/zap v1.27.0
+       golang.org/x/crypto v0.39.0
        google.golang.org/grpc v1.75.0
        google.golang.org/protobuf v1.36.8
        gopkg.in/yaml.v3 v3.0.1
@@ -41,7 +42,6 @@ require (
        github.com/spf13/pflag v1.0.9 // indirect
        github.com/zeebo/xxh3 v1.0.2 // indirect
        go.uber.org/multierr v1.11.0 // indirect
-       golang.org/x/crypto v0.39.0 // indirect
        golang.org/x/mod v0.25.0 // indirect
        golang.org/x/net v0.41.0 // indirect
        golang.org/x/sync v0.15.0 // indirect
diff --git a/go.sum b/go.sum
index 1e8a909..e914d56 100644
--- a/go.sum
+++ b/go.sum
@@ -121,6 +121,8 @@ golang.org/x/sync v0.15.0/go.mod 
h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
 golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
 golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
+golang.org/x/term v0.32.0 h1:DR4lr0TjUs3epypdhTOkMmuF5CDFJ/8pOnbzMZPQ7bg=
+golang.org/x/term v0.32.0/go.mod 
h1:uZG1FhGx848Sqfsq4/DlJr3xGGsYMu/L5GW4abiaEPQ=
 golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
 golang.org/x/text v0.26.0/go.mod 
h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
 golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc=
diff --git a/internal/collector/basic/database/jdbc_collector.go 
b/internal/collector/basic/database/jdbc_collector.go
index dd74b2f..86f01be 100644
--- a/internal/collector/basic/database/jdbc_collector.go
+++ b/internal/collector/basic/database/jdbc_collector.go
@@ -29,6 +29,7 @@ import (
        _ "github.com/lib/pq"
        _ "github.com/microsoft/go-mssqldb"
        jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
 )
@@ -50,10 +51,10 @@ const (
        PlatformOracle     = "oracle"
 
        // Response codes
-       CodeSuccess       = 200
-       CodeFail          = 500
-       CodeUnReachable   = 503
-       CodeUnConnectable = 502
+       CodeSuccess       = constants.CollectSuccess
+       CodeFail          = constants.CollectFail
+       CodeUnReachable   = constants.CollectUnReachable
+       CodeUnConnectable = constants.CollectUnConnectable
 )
 
 // JDBCCollector implements JDBC database collection
@@ -178,11 +179,11 @@ func (jc *JDBCCollector) Collect(metrics 
*jobtypes.Metrics) *jobtypes.CollectRep
 
        switch jdbcConfig.QueryType {
        case QueryTypeOneRow:
-               err = jc.queryOneRow(db, jdbcConfig.SQL, metrics.Aliasfields, 
response)
+               err = jc.queryOneRow(db, jdbcConfig.SQL, metrics.AliasFields, 
response)
        case QueryTypeMultiRow:
-               err = jc.queryMultiRow(db, jdbcConfig.SQL, metrics.Aliasfields, 
response)
+               err = jc.queryMultiRow(db, jdbcConfig.SQL, metrics.AliasFields, 
response)
        case QueryTypeColumns:
-               err = jc.queryColumns(db, jdbcConfig.SQL, metrics.Aliasfields, 
response)
+               err = jc.queryColumns(db, jdbcConfig.SQL, metrics.AliasFields, 
response)
        case QueryTypeRunScript:
                err = jc.runScript(db, jdbcConfig.SQL, response)
        default:
diff --git a/internal/collector/basic/init.go b/internal/collector/basic/init.go
index f4a7504..68a282e 100644
--- a/internal/collector/basic/init.go
+++ b/internal/collector/basic/init.go
@@ -21,6 +21,7 @@ package basic
 
 import (
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/database"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/ssh"
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
@@ -34,6 +35,10 @@ func init() {
                return database.NewJDBCCollector(logger)
        })
 
+       strategy.RegisterFactory("ssh", func(logger logger.Logger) 
strategy.Collector {
+               return ssh.NewSSHCollector(logger)
+       })
+
        // More protocols can be added here in the future:
        // strategy.RegisterFactory("http", func(logger logger.Logger) 
strategy.Collector {
        //     return http.NewHTTPCollector(logger)
diff --git a/internal/collector/basic/ssh/.keep 
b/internal/collector/basic/ssh/.keep
deleted file mode 100644
index e69de29..0000000
diff --git a/internal/collector/basic/ssh/ssh_collector.go 
b/internal/collector/basic/ssh/ssh_collector.go
new file mode 100644
index 0000000..42734e2
--- /dev/null
+++ b/internal/collector/basic/ssh/ssh_collector.go
@@ -0,0 +1,390 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package ssh
+
+import (
+       "bytes"
+       "context"
+       "fmt"
+       "net"
+       "strconv"
+       "strings"
+       "time"
+
+       sshhelper 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/ssh"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
+)
+
+const (
+       ProtocolSSH = "ssh"
+
+       // Special fields
+       ResponseTime = "responseTime"
+       NullValue    = "&nbsp;"
+
+       // Parse types
+       ParseTypeOneRow   = "oneRow"   // Each line corresponds to one field 
value, in order of aliasFields
+       ParseTypeMultiRow = "multiRow" // First line is header with field 
names, subsequent lines are data rows
+)
+
+// SSHCollector implements SSH collection
+type SSHCollector struct {
+       logger logger.Logger
+}
+
+// NewSSHCollector creates a new SSH collector
+func NewSSHCollector(logger logger.Logger) *SSHCollector {
+       return &SSHCollector{
+               logger: logger.WithName("ssh-collector"),
+       }
+}
+
+// extractSSHConfig extracts SSH configuration from interface{} type
+// This function uses the parameter replacer for consistent configuration 
extraction
+func extractSSHConfig(sshInterface interface{}) (*jobtypes.SSHProtocol, error) 
{
+       replacer := param.NewReplacer()
+       return replacer.ExtractSSHConfig(sshInterface)
+}
+
+// PreCheck validates the SSH metrics configuration
+func (jc *SSHCollector) PreCheck(metrics *jobtypes.Metrics) error {
+       if metrics == nil {
+               return fmt.Errorf("metrics is nil")
+       }
+
+       if metrics.SSH == nil {
+               return fmt.Errorf("SSH protocol configuration is required")
+       }
+
+       // Extract SSH configuration
+       sshConfig, err := extractSSHConfig(metrics.SSH)
+       if err != nil {
+               return fmt.Errorf("invalid SSH configuration: %w", err)
+       }
+       if sshConfig == nil {
+               return fmt.Errorf("SSH configuration is required")
+       }
+
+       // Validate required fields
+       if sshConfig.Host == "" {
+               return fmt.Errorf("host is required")
+       }
+       if sshConfig.Port == "" {
+               return fmt.Errorf("port is required")
+       }
+       if sshConfig.Username == "" {
+               return fmt.Errorf("username is required")
+       }
+       // Either password or private key must be provided for authentication
+       if sshConfig.Password == "" && sshConfig.PrivateKey == "" {
+               return fmt.Errorf("either password or privateKey is required 
for authentication")
+       }
+       if sshConfig.Script == "" {
+               return fmt.Errorf("script is required")
+       }
+       // parseScript and timeout are optional, will use defaults if not 
provided
+
+       return nil
+}
+
+// Collect performs SSH metrics collection
+func (sshc *SSHCollector) Collect(metrics *jobtypes.Metrics) 
*jobtypes.CollectRepMetricsData {
+       startTime := time.Now()
+
+       // Extract SSH configuration
+       sshConfig, err := extractSSHConfig(metrics.SSH)
+       if err != nil {
+               sshc.logger.Error(err, "failed to extract SSH config")
+               return sshc.createFailResponse(metrics, consts.CollectFail, 
fmt.Sprintf("SSH config error: %v", err))
+       }
+       if sshConfig == nil {
+               return sshc.createFailResponse(metrics, consts.CollectFail, 
"SSH configuration is required")
+       }
+
+       // Debug level only for collection start
+       sshc.logger.V(1).Info("starting SSH collection",
+               "host", sshConfig.Host,
+               "port", sshConfig.Port)
+
+       // Get timeout
+       timeout := sshc.getTimeout(sshConfig.Timeout)
+
+       // Execute SSH script
+       result, err := sshc.executeSSHScript(sshConfig, timeout)
+       if err != nil {
+               sshc.logger.Error(err, "failed to execute SSH script")
+               return sshc.createFailResponse(metrics, 
consts.CollectUnConnectable, fmt.Sprintf("SSH execution error: %v", err))
+       }
+
+       // Calculate response time
+       responseTime := time.Since(startTime).Milliseconds()
+
+       // Create success response with result
+       response := sshc.createSuccessResponseWithResult(metrics, result, 
responseTime)
+
+       return response
+}
+
+// Protocol returns the protocol this collector supports
+func (sshc *SSHCollector) Protocol() string {
+       return ProtocolSSH
+}
+
+// getTimeout parses timeout string and returns duration
+func (sshc *SSHCollector) getTimeout(timeoutStr string) time.Duration {
+       if timeoutStr == "" {
+               return 30 * time.Second // default timeout
+       }
+
+       // First try parsing as duration string (e.g., "10s", "5m", "500ms")
+       if duration, err := time.ParseDuration(timeoutStr); err == nil {
+               return duration
+       }
+
+       if timeout, err := strconv.Atoi(timeoutStr); err == nil {
+               return time.Duration(timeout) * time.Millisecond
+       }
+
+       return 30 * time.Second // fallback to default
+}
+
+// executeSSHScript executes the SSH script and returns the result
+func (sshc *SSHCollector) executeSSHScript(config *jobtypes.SSHProtocol, 
timeout time.Duration) (string, error) {
+       // Create SSH client configuration using helper function
+       clientConfig, err := sshhelper.CreateSSHClientConfig(config, 
sshc.logger)
+       if err != nil {
+               return "", fmt.Errorf("failed to create SSH client config: %w", 
err)
+       }
+
+       // Set timeout for the SSH connection
+       clientConfig.Timeout = timeout
+
+       // Create context with timeout for the entire operation
+       ctx, cancel := context.WithTimeout(context.Background(), timeout)
+       defer cancel()
+
+       // Use helper function to dial with context (with proxy support)
+       conn, err := sshhelper.DialWithProxy(ctx, config, clientConfig, 
sshc.logger)
+       if err != nil {
+               address := net.JoinHostPort(config.Host, config.Port)
+               return "", fmt.Errorf("failed to connect to SSH server %s: %w", 
address, err)
+       }
+       defer conn.Close()
+
+       // Create a session
+       session, err := conn.NewSession()
+       if err != nil {
+               return "", fmt.Errorf("failed to create SSH session: %w", err)
+       }
+       defer session.Close()
+
+       // Execute the script and capture output
+       var stdout bytes.Buffer
+       var stderr bytes.Buffer
+       session.Stdout = &stdout
+       session.Stderr = &stderr
+
+       // Execute the command
+       err = session.Run(config.Script)
+       if err != nil {
+               stderrStr := stderr.String()
+               if stderrStr != "" {
+                       return "", fmt.Errorf("script execution failed: %w, 
stderr: %s", err, stderrStr)
+               }
+               return "", fmt.Errorf("script execution failed: %w", err)
+       }
+       sshc.logger.Info("SSH script executed successfully")
+       sshc.logger.Info("SSH script output", "output", stdout.String())
+
+       return stdout.String(), nil
+}
+
+// createFailResponse creates a failed metrics data response
+func (jc *SSHCollector) createFailResponse(metrics *jobtypes.Metrics, code 
int, message string) *jobtypes.CollectRepMetricsData {
+       return &jobtypes.CollectRepMetricsData{
+               ID:        0,  // Will be set by the calling context
+               MonitorID: 0,  // Will be set by the calling context
+               App:       "", // Will be set by the calling context
+               Metrics:   metrics.Name,
+               Priority:  0,
+               Time:      time.Now().UnixMilli(),
+               Code:      code,
+               Msg:       message,
+               Fields:    make([]jobtypes.Field, 0),
+               Values:    make([]jobtypes.ValueRow, 0),
+               Labels:    make(map[string]string),
+               Metadata:  make(map[string]string),
+       }
+}
+
+// createSuccessResponseWithResult creates a successful metrics data response 
with execution result
+func (sshc *SSHCollector) createSuccessResponseWithResult(metrics 
*jobtypes.Metrics, result string, responseTime int64) 
*jobtypes.CollectRepMetricsData {
+       response := &jobtypes.CollectRepMetricsData{
+               ID:        0,  // Will be set by the calling context
+               MonitorID: 0,  // Will be set by the calling context
+               App:       "", // Will be set by the calling context
+               Metrics:   metrics.Name,
+               Priority:  0,
+               Time:      time.Now().UnixMilli(),
+               // FIXME: use consts.CollectSuccess
+               Code:     200, // Success
+               Msg:      "success",
+               Fields:   make([]jobtypes.Field, 0),
+               Values:   make([]jobtypes.ValueRow, 0),
+               Labels:   make(map[string]string),
+               Metadata: make(map[string]string),
+       }
+
+       // Extract SSH configuration to get ParseType
+       sshConfig, err := extractSSHConfig(metrics.SSH)
+       if err != nil {
+               sshc.logger.Error(err, "failed to extract SSH config for 
parsing")
+               return response
+       }
+
+       // Parse response data based on ParseType
+       if strings.EqualFold(sshConfig.ParseType, ParseTypeOneRow) {
+               sshc.parseResponseDataByOne(result, metrics.AliasFields, 
response, responseTime)
+       } else if strings.EqualFold(sshConfig.ParseType, ParseTypeMultiRow) {
+               // Default to multi-row format
+               sshc.parseResponseDataByMulti(result, metrics.AliasFields, 
response, responseTime)
+       }
+
+       return response
+}
+
+// parseResponseDataByOne parses SSH script output in single-row format
+// Each line contains a single field value, matching the order of aliasFields
+func (sshc *SSHCollector) parseResponseDataByOne(result string, aliasFields 
[]string, response *jobtypes.CollectRepMetricsData, responseTime int64) {
+       lines := strings.Split(result, "\n")
+       if len(lines)+1 < len(aliasFields) {
+               sshc.logger.Error(fmt.Errorf("ssh response data not enough"), 
"result", result)
+               return
+       }
+
+       valueRow := jobtypes.ValueRow{
+               Columns: make([]string, 0),
+       }
+
+       aliasIndex := 0
+       lineIndex := 0
+
+       for aliasIndex < len(aliasFields) {
+               alias := aliasFields[aliasIndex]
+
+               if strings.EqualFold(alias, ResponseTime) {
+                       // Add response time for special field
+                       valueRow.Columns = append(valueRow.Columns, 
fmt.Sprintf("%d", responseTime))
+               } else {
+                       if lineIndex < len(lines) {
+                               valueRow.Columns = append(valueRow.Columns, 
strings.TrimSpace(lines[lineIndex]))
+                       } else {
+                               // Add null value if line not available
+                               valueRow.Columns = append(valueRow.Columns, 
NullValue)
+                       }
+                       lineIndex++
+               }
+               aliasIndex++
+       }
+
+       response.Values = append(response.Values, valueRow)
+
+       sshc.logger.Info("Parsed SSH script data (OneRow format)", 
"aliasFields", aliasFields, "lineCount", len(lines))
+
+       // Set Fields based on aliasFields
+       for _, alias := range aliasFields {
+               field := jobtypes.Field{
+                       Field: alias,
+                       Type:  1, // Default type
+                       Label: false,
+                       Unit:  "",
+               }
+               response.Fields = append(response.Fields, field)
+       }
+}
+
+// parseResponseDataByMulti parses SSH script output in multi-row format
+// First line contains field names, subsequent lines contain data
+func (sshc *SSHCollector) parseResponseDataByMulti(result string, aliasFields 
[]string, response *jobtypes.CollectRepMetricsData, responseTime int64) {
+       lines := strings.Split(result, "\n")
+       if len(lines) <= 1 {
+               sshc.logger.Error(fmt.Errorf("ssh response data only has 
header"), "result", result)
+               return
+       }
+
+       // Parse the first line as field names
+       fields := strings.Fields(lines[0]) // Use Fields instead of Split to 
handle multiple spaces
+       fieldMapping := make(map[string]int, len(fields))
+       for i, field := range fields {
+               fieldMapping[strings.ToLower(strings.TrimSpace(field))] = i
+       }
+
+       sshc.logger.Info("Parsed SSH script header fields", "fields", fields, 
"fieldMapping", fieldMapping)
+
+       // Parse subsequent lines as data rows
+       dataRowCount := 0
+       for i := 1; i < len(lines); i++ {
+               line := strings.TrimSpace(lines[i])
+               if line == "" {
+                       continue // Skip empty lines
+               }
+
+               values := strings.Fields(line) // Use Fields to handle multiple 
spaces
+               valueRow := jobtypes.ValueRow{
+                       Columns: make([]string, 0),
+               }
+
+               // Build columns based on aliasFields order
+               for _, alias := range aliasFields {
+                       if strings.EqualFold(alias, ResponseTime) {
+                               // Add response time for special field
+                               valueRow.Columns = append(valueRow.Columns, 
fmt.Sprintf("%d", responseTime))
+                       } else {
+                               // Find the field index in the mapping
+                               index, exists := 
fieldMapping[strings.ToLower(alias)]
+                               if exists && index < len(values) {
+                                       valueRow.Columns = 
append(valueRow.Columns, values[index])
+                               } else {
+                                       // Add null value if field not found or 
index out of range
+                                       valueRow.Columns = 
append(valueRow.Columns, NullValue)
+                                       sshc.logger.V(1).Info("Field not found 
or index out of range",
+                                               "alias", alias, "exists", 
exists, "index", index, "valuesLength", len(values))
+                               }
+                       }
+               }
+
+               response.Values = append(response.Values, valueRow)
+               dataRowCount++
+       }
+
+       sshc.logger.Info("Parsed SSH script data", "dataRows", dataRowCount, 
"aliasFields", aliasFields)
+
+       // Set Fields based on aliasFields
+       for _, alias := range aliasFields {
+               field := jobtypes.Field{
+                       Field: alias,
+                       Type:  1, // Default type
+                       Label: false,
+                       Unit:  "",
+               }
+               response.Fields = append(response.Fields, field)
+       }
+}
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go 
b/internal/collector/common/collect/dispatch/metrics_collector.go
index 0795a8a..1f46485 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -21,7 +21,6 @@ package dispatch
 
 import (
        "fmt"
-       "net/http"
        "time"
 
        // Import basic package with blank identifier to trigger its init() 
function
@@ -29,6 +28,7 @@ import (
        _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
        jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
@@ -67,7 +67,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics 
*jobtypes.Metrics, job *jobty
                                "protocol", metrics.Protocol,
                                "metricsName", metrics.Name)
 
-                       result := mc.createErrorResponse(metrics, job, 
http.StatusInternalServerError, fmt.Sprintf("Collector not found: %v", err))
+                       result := mc.createErrorResponse(metrics, job, 
constants.CollectUnavailable, fmt.Sprintf("Collector not found: %v", err))
                        resultChan <- result
                        return
                }
@@ -102,7 +102,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics 
*jobtypes.Metrics, job *jobty
                duration := time.Since(startTime)
 
                // Only log failures at INFO level, success at debug level
-               if result != nil && result.Code == http.StatusOK {
+               if result != nil && result.Code == constants.CollectSuccess {
                        mc.logger.V(1).Info("metrics collection completed 
successfully",
                                "metricsName", metrics.Name,
                                "protocol", metrics.Protocol,
diff --git a/internal/collector/common/collect/lazy_message_router.go 
b/internal/collector/common/collect/lazy_message_router.go
index caff0ec..3ad76e1 100644
--- a/internal/collector/common/collect/lazy_message_router.go
+++ b/internal/collector/common/collect/lazy_message_router.go
@@ -183,26 +183,6 @@ func (l *LazyMessageRouter) serializeToArrow(dataList 
[]*jobtypes.CollectRepMetr
 
 // createArrowRecordBatch creates Arrow RecordBatch for single MetricsData, 
compatible with Java Manager
 func (l *LazyMessageRouter) createArrowRecordBatch(mem memory.Allocator, data 
*jobtypes.CollectRepMetricsData) (arrow.Record, error) {
-       // Create metadata for fields (all fields use the same default metadata)
-       emptyMetadata := arrow.MetadataFrom(map[string]string{
-               "type":  "1",
-               "label": "false",
-               "unit":  "none",
-       })
-
-       // Define metadata fields (fixed fields)
-       metadataFields := []arrow.Field{
-               {Name: "app", Type: arrow.BinaryTypes.String, Nullable: true, 
Metadata: emptyMetadata},
-               {Name: "metrics", Type: arrow.BinaryTypes.String, Nullable: 
true, Metadata: emptyMetadata},
-               {Name: "id", Type: arrow.BinaryTypes.String, Nullable: true, 
Metadata: emptyMetadata},
-               {Name: "monitorId", Type: arrow.BinaryTypes.String, Nullable: 
true, Metadata: emptyMetadata},
-               {Name: "tenantId", Type: arrow.BinaryTypes.String, Nullable: 
true, Metadata: emptyMetadata},
-               {Name: "priority", Type: arrow.BinaryTypes.String, Nullable: 
true, Metadata: emptyMetadata},
-               {Name: "time", Type: arrow.BinaryTypes.String, Nullable: true, 
Metadata: emptyMetadata},
-               {Name: "code", Type: arrow.BinaryTypes.String, Nullable: true, 
Metadata: emptyMetadata},
-               {Name: "msg", Type: arrow.BinaryTypes.String, Nullable: true, 
Metadata: emptyMetadata},
-       }
-
        // Add dynamic fields (based on collected field definitions)
        dynamicFields := make([]arrow.Field, 0, len(data.Fields))
        for _, field := range data.Fields {
@@ -230,9 +210,8 @@ func (l *LazyMessageRouter) createArrowRecordBatch(mem 
memory.Allocator, data *j
                })
        }
 
-       // Merge all fields
-       allFields := make([]arrow.Field, 0, 
len(metadataFields)+len(dynamicFields))
-       allFields = append(allFields, metadataFields...)
+       // Merge dynamic fields only, as metadata fields are not needed in 
final schema
+       allFields := make([]arrow.Field, 0, len(dynamicFields))
        allFields = append(allFields, dynamicFields...)
 
        // Create schema-level metadata
@@ -268,33 +247,20 @@ func (l *LazyMessageRouter) createArrowRecordBatch(mem 
memory.Allocator, data *j
 
        // Fill data
        for rowIdx := 0; rowIdx < rowCount; rowIdx++ {
-               // Fill metadata fields
-               builders[0].(*array.StringBuilder).Append(data.App)
-               builders[1].(*array.StringBuilder).Append(data.Metrics)
-               builders[2].(*array.StringBuilder).Append(fmt.Sprintf("%d", 
data.ID))
-               builders[3].(*array.StringBuilder).Append(fmt.Sprintf("%d", 
data.MonitorID))
-               builders[4].(*array.StringBuilder).Append(fmt.Sprintf("%d", 
data.TenantID))
-               builders[5].(*array.StringBuilder).Append(fmt.Sprintf("%d", 
data.Priority))
-               builders[6].(*array.StringBuilder).Append(fmt.Sprintf("%d", 
data.Time))
-               builders[7].(*array.StringBuilder).Append(fmt.Sprintf("%d", 
data.Code))
-               builders[8].(*array.StringBuilder).Append(data.Msg)
-
                // Fill dynamic field data
                if rowIdx < len(data.Values) {
                        valueRow := data.Values[rowIdx]
-                       for i, _ := range data.Fields {
-                               builderIdx := len(metadataFields) + i
+                       for i := range data.Fields {
                                var value string
                                if i < len(valueRow.Columns) {
                                        value = valueRow.Columns[i]
                                }
-                               
builders[builderIdx].(*array.StringBuilder).Append(value)
+                               builders[i].(*array.StringBuilder).Append(value)
                        }
                } else {
                        // Fill empty values
                        for i := range data.Fields {
-                               builderIdx := len(metadataFields) + i
-                               
builders[builderIdx].(*array.StringBuilder).Append("")
+                               builders[i].(*array.StringBuilder).Append("")
                        }
                }
        }
diff --git a/internal/collector/common/collect/result_handler.go 
b/internal/collector/common/collect/result_handler.go
index 48cc240..cd16820 100644
--- a/internal/collector/common/collect/result_handler.go
+++ b/internal/collector/common/collect/result_handler.go
@@ -21,9 +21,9 @@ package collect
 
 import (
        "fmt"
-       "net/http"
 
        jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
@@ -69,7 +69,7 @@ func (rh *ResultHandlerImpl) HandleCollectData(data 
*jobtypes.CollectRepMetricsD
                }
 
                // Log successful result sending
-               if data.Code == http.StatusOK {
+               if data.Code == constants.CollectSuccess {
                        rh.logger.V(1).Info("successfully sent collection 
result to Manager",
                                "metricsName", data.Metrics)
                } else {
diff --git a/internal/collector/common/dispatcher/common_dispatcher.go 
b/internal/collector/common/dispatcher/common_dispatcher.go
index 8d37be5..288e0c1 100644
--- a/internal/collector/common/dispatcher/common_dispatcher.go
+++ b/internal/collector/common/dispatcher/common_dispatcher.go
@@ -68,6 +68,7 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx 
context.Context, job *jo
 
        startTime := time.Now()
 
+       job.ConstructPriorMetrics()
        // Get the next metrics to collect based on job priority and 
dependencies
        metricsToCollect := job.NextCollectMetrics()
        if len(metricsToCollect) == 0 {
diff --git a/internal/collector/common/ssh/.keep 
b/internal/collector/common/ssh/.keep
deleted file mode 100644
index e69de29..0000000
diff --git a/internal/collector/common/ssh/ssh_helper.go 
b/internal/collector/common/ssh/ssh_helper.go
new file mode 100644
index 0000000..fbecb76
--- /dev/null
+++ b/internal/collector/common/ssh/ssh_helper.go
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package ssh
+
+import (
+       "context"
+       "crypto/x509"
+       "encoding/pem"
+       "fmt"
+       "net"
+       "strings"
+
+       "golang.org/x/crypto/ssh"
+
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// CreateSSHClientConfig creates SSH client configuration based on the 
protocol config
+func CreateSSHClientConfig(config *jobtypes.SSHProtocol, logger logger.Logger) 
(*ssh.ClientConfig, error) {
+       clientConfig := &ssh.ClientConfig{
+               User:            config.Username,
+               HostKeyCallback: ssh.InsecureIgnoreHostKey(), // Note: In 
production, you should verify host keys
+       }
+
+       // Configure authentication
+       if config.PrivateKey != "" && strings.TrimSpace(config.PrivateKey) != 
"" {
+               logger.Info("Using private key authentication")
+               // Use private key authentication
+               signer, err := ParsePrivateKey(config.PrivateKey)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to parse private key: 
%w", err)
+               }
+               clientConfig.Auth = []ssh.AuthMethod{ssh.PublicKeys(signer)}
+       } else if config.Password != "" {
+               logger.Info("Using password authentication")
+               // Use password authentication
+               clientConfig.Auth = 
[]ssh.AuthMethod{ssh.Password(config.Password)}
+       } else {
+               return nil, fmt.Errorf("either password or private key must be 
provided")
+       }
+
+       return clientConfig, nil
+}
+
+// ParsePrivateKey parses a private key string and returns an ssh.Signer
+func ParsePrivateKey(privateKeyStr string) (ssh.Signer, error) {
+       // Decode the PEM block
+       block, _ := pem.Decode([]byte(privateKeyStr))
+       if block == nil {
+               return nil, fmt.Errorf("failed to decode PEM block from private 
key")
+       }
+
+       // Parse the private key
+       var privateKey interface{}
+       var err error
+
+       switch block.Type {
+       case "RSA PRIVATE KEY":
+               privateKey, err = x509.ParsePKCS1PrivateKey(block.Bytes)
+       case "PRIVATE KEY":
+               privateKey, err = x509.ParsePKCS8PrivateKey(block.Bytes)
+       case "EC PRIVATE KEY":
+               privateKey, err = x509.ParseECPrivateKey(block.Bytes)
+       case "OPENSSH PRIVATE KEY":
+               // Use ssh.ParseRawPrivateKey for OpenSSH format
+               privateKey, err = ssh.ParseRawPrivateKey([]byte(privateKeyStr))
+       default:
+               return nil, fmt.Errorf("unsupported private key type: %s", 
block.Type)
+       }
+
+       if err != nil {
+               return nil, fmt.Errorf("failed to parse private key: %w", err)
+       }
+
+       // Create SSH signer
+       signer, err := ssh.NewSignerFromKey(privateKey)
+       if err != nil {
+               return nil, fmt.Errorf("failed to create signer from private 
key: %w", err)
+       }
+
+       return signer, nil
+}
+
+// createProxyClientConfig creates SSH client configuration for proxy server
+func createProxyClientConfig(config *jobtypes.SSHProtocol, logger 
logger.Logger) (*ssh.ClientConfig, error) {
+       clientConfig := &ssh.ClientConfig{
+               User:            config.ProxyUsername,
+               HostKeyCallback: ssh.InsecureIgnoreHostKey(),
+       }
+
+       // Configure authentication for proxy
+       if config.ProxyPrivateKey != "" && 
strings.TrimSpace(config.ProxyPrivateKey) != "" {
+               logger.Info("Using proxy private key authentication")
+               // Use private key authentication for proxy
+               signer, err := ParsePrivateKey(config.ProxyPrivateKey)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to parse proxy private 
key: %w", err)
+               }
+               clientConfig.Auth = []ssh.AuthMethod{ssh.PublicKeys(signer)}
+       } else if config.ProxyPassword != "" {
+               logger.Info("Using proxy password authentication")
+               // Use password authentication for proxy
+               clientConfig.Auth = 
[]ssh.AuthMethod{ssh.Password(config.ProxyPassword)}
+       } else {
+               return nil, fmt.Errorf("either proxy password or proxy private 
key must be provided")
+       }
+
+       return clientConfig, nil
+}
+
+// dialViaProxy establishes SSH connection through a proxy server
+func dialViaProxy(ctx context.Context, config *jobtypes.SSHProtocol, 
targetConfig *ssh.ClientConfig, logger logger.Logger) (*ssh.Client, error) {
+       // Create proxy client configuration
+       proxyConfig, err := createProxyClientConfig(config, logger)
+       if err != nil {
+               return nil, fmt.Errorf("failed to create proxy client config: 
%w", err)
+       }
+
+       // Set timeout for proxy connection
+       if timeout := targetConfig.Timeout; timeout > 0 {
+               proxyConfig.Timeout = timeout
+       }
+
+       // Connect to proxy server
+       proxyAddress := net.JoinHostPort(config.ProxyHost, config.ProxyPort)
+       logger.Info("Connecting to proxy server", "address", proxyAddress)
+
+       // Create dialer with context for proxy connection
+       dialer := &net.Dialer{}
+       proxyConn, err := dialer.DialContext(ctx, "tcp", proxyAddress)
+       if err != nil {
+               return nil, fmt.Errorf("failed to connect to proxy server %s: 
%w", proxyAddress, err)
+       }
+
+       // Create SSH client connection to proxy
+       proxyClientConn, proxyChans, proxyReqs, err := 
ssh.NewClientConn(proxyConn, proxyAddress, proxyConfig)
+       if err != nil {
+               proxyConn.Close()
+               return nil, fmt.Errorf("failed to establish SSH connection to 
proxy: %w", err)
+       }
+       proxyClient := ssh.NewClient(proxyClientConn, proxyChans, proxyReqs)
+
+       // Create connection to target server through proxy
+       targetAddress := net.JoinHostPort(config.Host, config.Port)
+       logger.Info("Connecting to target server via proxy", "target", 
targetAddress)
+
+       targetConn, err := proxyClient.Dial("tcp", targetAddress)
+       if err != nil {
+               proxyClient.Close()
+               return nil, fmt.Errorf("failed to dial target server %s via 
proxy: %w", targetAddress, err)
+       }
+
+       // Create SSH client connection to target server
+       targetClientConn, targetChans, targetReqs, err := 
ssh.NewClientConn(targetConn, targetAddress, targetConfig)
+       if err != nil {
+               targetConn.Close()
+               proxyClient.Close()
+               return nil, fmt.Errorf("failed to establish SSH connection to 
target server: %w", err)
+       }
+
+       // Note: We need to keep the proxy client alive, so we don't close it 
here
+       // The target client will handle the connection lifecycle
+       return ssh.NewClient(targetClientConn, targetChans, targetReqs), nil
+}
+
+// DialWithContext dials SSH connection with context support
+func DialWithContext(ctx context.Context, network, addr string, config 
*ssh.ClientConfig) (*ssh.Client, error) {
+       // Create a dialer with context
+       dialer := &net.Dialer{}
+       conn, err := dialer.DialContext(ctx, network, addr)
+       if err != nil {
+               return nil, err
+       }
+
+       // Create SSH client connection
+       c, chans, reqs, err := ssh.NewClientConn(conn, addr, config)
+       if err != nil {
+               conn.Close()
+               return nil, err
+       }
+
+       return ssh.NewClient(c, chans, reqs), nil
+}
+
+// DialWithProxy dials SSH connection through proxy with context support
+func DialWithProxy(ctx context.Context, sshConfig *jobtypes.SSHProtocol, 
clientConfig *ssh.ClientConfig, logger logger.Logger) (*ssh.Client, error) {
+       // TODO: implement Reuse connection logic
+
+       // Check if proxy is enabled and configured
+       if sshConfig.UseProxy != "true" || sshConfig.ProxyHost == "" || 
sshConfig.ProxyPort == "" {
+               // Use direct connection if proxy is not enabled or not 
configured
+               address := net.JoinHostPort(sshConfig.Host, sshConfig.Port)
+               return DialWithContext(ctx, "tcp", address, clientConfig)
+       }
+
+       // Validate proxy configuration
+       if sshConfig.ProxyUsername == "" {
+               return nil, fmt.Errorf("proxy username is required when using 
proxy")
+       }
+       if sshConfig.ProxyPassword == "" && sshConfig.ProxyPrivateKey == "" {
+               return nil, fmt.Errorf("either proxy password or proxy private 
key is required when using proxy")
+       }
+
+       logger.Info("Using proxy connection", "proxyHost", sshConfig.ProxyHost, 
"proxyPort", sshConfig.ProxyPort)
+       return dialViaProxy(ctx, sshConfig, clientConfig, logger)
+}
diff --git a/internal/collector/common/types/job/job_types.go 
b/internal/collector/common/types/job/job_types.go
index 70c3c10..aca80cd 100644
--- a/internal/collector/common/types/job/job_types.go
+++ b/internal/collector/common/types/job/job_types.go
@@ -17,6 +17,14 @@
 
 package job
 
+import (
+       "log"
+       "math"
+       "sort"
+       "sync"
+       "time"
+)
+
 // hertzbeat Collect Job related types
 
 // Job represents a complete monitoring job
@@ -49,8 +57,10 @@ type Job struct {
        // Internal fields
        EnvConfigmaps    map[string]Configmap `json:"-"`
        DispatchTime     int64                `json:"-"`
-       PriorMetrics     []Metrics            `json:"-"`
+       PriorMetrics     [][]*Metrics         `json:"-"` // LinkedList of 
Set<Metrics>
        ResponseDataTemp []MetricsData        `json:"-"`
+
+       mu sync.Mutex // Mutex for thread-safe operations
 }
 
 // NextCollectMetrics returns the metrics that should be collected next
@@ -65,7 +75,85 @@ func (j *Job) NextCollectMetrics() []*Metrics {
 }
 
 // ConstructPriorMetrics prepares prior metrics for collection dependencies
+// This method filters metrics that need to be collected based on time,
+// sets default values, groups by priority, and constructs an ordered 
collection queue
 func (j *Job) ConstructPriorMetrics() {
-       j.PriorMetrics = make([]Metrics, len(j.Metrics))
-       copy(j.PriorMetrics, j.Metrics)
+       j.mu.Lock()
+       defer j.mu.Unlock()
+
+       now := time.Now().UnixMilli()
+
+       // Filter metrics that need to be collected and set default values
+       currentCollectMetrics := make(map[int][]*Metrics)
+
+       for i := range j.Metrics {
+               metric := &j.Metrics[i]
+
+               // Check if it's time to collect this metric
+               if now >= metric.CollectTime+metric.Interval*1000 {
+                       // Update collect time
+                       metric.CollectTime = now
+
+                       // Set default AliasFields if not configured
+                       if len(metric.AliasFields) == 0 && len(metric.Fields) > 
0 {
+                               aliasFields := make([]string, 
len(metric.Fields))
+                               for idx, field := range metric.Fields {
+                                       aliasFields[idx] = field.Field
+                               }
+                               metric.AliasFields = aliasFields
+                       }
+
+                       // Group by priority
+                       priority := metric.Priority
+                       currentCollectMetrics[priority] = 
append(currentCollectMetrics[priority], metric)
+               }
+       }
+
+       // If no metrics need to be collected, add the default availability 
metric (priority 0)
+       if len(currentCollectMetrics) == 0 {
+               for i := range j.Metrics {
+                       metric := &j.Metrics[i]
+                       if metric.Priority == 0 {
+                               metric.CollectTime = now
+                               currentCollectMetrics[0] = []*Metrics{metric}
+                               break
+                       }
+               }
+
+               // If still empty, log error
+               if len(currentCollectMetrics) == 0 {
+                       log.Println("ERROR: metrics must has one priority 0 
metrics at least.")
+               }
+       }
+
+       // Construct a linked list (slice) of task execution order of the 
metrics
+       // Each element is a set (slice) of metrics with the same priority
+       j.PriorMetrics = make([][]*Metrics, 0, len(currentCollectMetrics))
+
+       for _, metricsSet := range currentCollectMetrics {
+               // Make a copy to ensure thread safety
+               metricsCopy := make([]*Metrics, len(metricsSet))
+               copy(metricsCopy, metricsSet)
+               j.PriorMetrics = append(j.PriorMetrics, metricsCopy)
+       }
+
+       // Sort by priority (ascending order)
+       sort.Slice(j.PriorMetrics, func(i, k int) bool {
+               // Get priority from the first metric in each set
+               var iPriority, kPriority int = math.MaxInt, math.MaxInt
+
+               if len(j.PriorMetrics[i]) > 0 && j.PriorMetrics[i][0] != nil {
+                       iPriority = j.PriorMetrics[i][0].Priority
+               }
+               if len(j.PriorMetrics[k]) > 0 && j.PriorMetrics[k][0] != nil {
+                       kPriority = j.PriorMetrics[k][0].Priority
+               }
+
+               return iPriority < kPriority
+       })
+
+       // Initialize EnvConfigmaps
+       if j.EnvConfigmaps == nil {
+               j.EnvConfigmaps = make(map[string]Configmap, 8)
+       }
 }
diff --git a/internal/collector/common/types/job/metrics_types.go 
b/internal/collector/common/types/job/metrics_types.go
index 04f035e..ef88be8 100644
--- a/internal/collector/common/types/job/metrics_types.go
+++ b/internal/collector/common/types/job/metrics_types.go
@@ -31,9 +31,8 @@ type Metrics struct {
        Interval    int64             `json:"interval"`
        Visible     *bool             `json:"visible"`
        Fields      []Field           `json:"fields"`
-       Aliasfields []string          `json:"aliasfields"`
-       AliasFields []string          `json:"aliasFields"` // Alternative field 
name
-       Calculates  interface{}       `json:"calculates"`  // Can be 
[]Calculate or []string
+       AliasFields []string          `json:"aliasFields"`
+       Calculates  interface{}       `json:"calculates"` // Can be []Calculate 
or []string
        Filters     interface{}       `json:"filters"`
        Units       interface{}       `json:"units"` // Can be []Unit or 
[]string
        Protocol    string            `json:"protocol"`
@@ -46,7 +45,7 @@ type Metrics struct {
 
        // Protocol specific fields
        HTTP    *HTTPProtocol    `json:"http,omitempty"`
-       SSH     *SSHProtocol     `json:"ssh,omitempty"`
+       SSH     interface{}      `json:"ssh,omitempty"`  // Can be SSHProtocol
        JDBC    interface{}      `json:"jdbc,omitempty"` // Can be JDBCProtocol 
or map[string]interface{}
        SNMP    *SNMPProtocol    `json:"snmp,omitempty"`
        JMX     *JMXProtocol     `json:"jmx,omitempty"`
@@ -185,14 +184,23 @@ type HTTPProtocol struct {
 
 // SSHProtocol represents SSH protocol configuration
 type SSHProtocol struct {
-       Host        string `json:"host"`
-       Port        int    `json:"port"`
-       Username    string `json:"username"`
-       Password    string `json:"password"`
-       PrivateKey  string `json:"privateKey"`
-       Script      string `json:"script"`
-       ParseScript string `json:"parseScript"`
-       Timeout     int    `json:"timeout"`
+       Host                 string `json:"host"`
+       Port                 string `json:"port"`
+       Username             string `json:"username"`
+       Password             string `json:"password"`
+       PrivateKey           string `json:"privateKey"`
+       PrivateKeyPassphrase string `json:"privateKeyPassphrase"`
+       Script               string `json:"script"`
+       ParseType            string `json:"parseType"`
+       ParseScript          string `json:"parseScript"`
+       Timeout              string `json:"timeout"`
+       ReuseConnection      string `json:"reuseConnection"`
+       UseProxy             string `json:"useProxy"`
+       ProxyHost            string `json:"proxyHost"`
+       ProxyPort            string `json:"proxyPort"`
+       ProxyUsername        string `json:"proxyUsername"`
+       ProxyPassword        string `json:"proxyPassword"`
+       ProxyPrivateKey      string `json:"proxyPrivateKey"`
 }
 
 // JDBCProtocol represents JDBC protocol configuration
@@ -328,12 +336,6 @@ func (j *Job) Clone() *Job {
                                copy(clone.Metrics[i].Fields, metric.Fields)
                        }
 
-                       // Deep copy Aliasfields slice
-                       if metric.Aliasfields != nil {
-                               clone.Metrics[i].Aliasfields = make([]string, 
len(metric.Aliasfields))
-                               copy(clone.Metrics[i].Aliasfields, 
metric.Aliasfields)
-                       }
-
                        // Deep copy AliasFields slice
                        if metric.AliasFields != nil {
                                clone.Metrics[i].AliasFields = make([]string, 
len(metric.AliasFields))
diff --git a/internal/constants/const.go b/internal/constants/const.go
index 94f87d8..1376097 100644
--- a/internal/constants/const.go
+++ b/internal/constants/const.go
@@ -42,3 +42,13 @@ const (
 const (
        CollectorModule = "collector"
 )
+
+// Collect Response status code
+const (
+       CollectSuccess       = 0
+       CollectUnavailable   = 1
+       CollectUnReachable   = 2
+       CollectUnConnectable = 3
+       CollectFail          = 4
+       CollectTimeout       = 5
+)
diff --git a/internal/util/param/param_replacer.go 
b/internal/util/param/param_replacer.go
index ff6a944..1b23781 100644
--- a/internal/util/param/param_replacer.go
+++ b/internal/util/param/param_replacer.go
@@ -188,6 +188,12 @@ func (r *Replacer) replaceMetricsParams(metrics 
*jobtypes.Metrics, paramMap map[
        // For now, other protocols (HTTP, SSH, etc.) are concrete struct 
pointers and
        // don't require parameter replacement
 
+       if metrics.SSH != nil {
+               if err := r.replaceProtocolParams(&metrics.SSH, paramMap); err 
!= nil {
+                       return fmt.Errorf("failed to replace SSH params: %w", 
err)
+               }
+       }
+
        // Replace parameters in basic metrics fields
        if err := r.replaceBasicMetricsParams(metrics, paramMap); err != nil {
                return fmt.Errorf("failed to replace basic metrics params: %w", 
err)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to