This is an automated email from the ASF dual-hosted git repository.
shown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to refs/heads/main by this push:
new 25ecbd2 feat: implement SSH collector with proxy support and optimize
job scheduling (#25)
25ecbd2 is described below
commit 25ecbd23e04ead5feaef840385372341f3cfbebe
Author: Yang Yexuan <[email protected]>
AuthorDate: Fri Oct 17 22:22:33 2025 +0800
feat: implement SSH collector with proxy support and optimize job
scheduling (#25)
---
go.mod | 2 +-
go.sum | 2 +
.../collector/basic/database/jdbc_collector.go | 15 +-
internal/collector/basic/init.go | 5 +
internal/collector/basic/ssh/.keep | 0
internal/collector/basic/ssh/ssh_collector.go | 390 +++++++++++++++++++++
.../common/collect/dispatch/metrics_collector.go | 6 +-
.../common/collect/lazy_message_router.go | 44 +--
.../collector/common/collect/result_handler.go | 4 +-
.../common/dispatcher/common_dispatcher.go | 1 +
internal/collector/common/ssh/.keep | 0
internal/collector/common/ssh/ssh_helper.go | 224 ++++++++++++
internal/collector/common/types/job/job_types.go | 94 ++++-
.../collector/common/types/job/metrics_types.go | 38 +-
internal/constants/const.go | 10 +
internal/util/param/param_replacer.go | 6 +
16 files changed, 768 insertions(+), 73 deletions(-)
diff --git a/go.mod b/go.mod
index 88dc66f..6115950 100644
--- a/go.mod
+++ b/go.mod
@@ -13,6 +13,7 @@ require (
github.com/spf13/cobra v1.10.1
github.com/stretchr/testify v1.10.0
go.uber.org/zap v1.27.0
+ golang.org/x/crypto v0.39.0
google.golang.org/grpc v1.75.0
google.golang.org/protobuf v1.36.8
gopkg.in/yaml.v3 v3.0.1
@@ -41,7 +42,6 @@ require (
github.com/spf13/pflag v1.0.9 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
- golang.org/x/crypto v0.39.0 // indirect
golang.org/x/mod v0.25.0 // indirect
golang.org/x/net v0.41.0 // indirect
golang.org/x/sync v0.15.0 // indirect
diff --git a/go.sum b/go.sum
index 1e8a909..e914d56 100644
--- a/go.sum
+++ b/go.sum
@@ -121,6 +121,8 @@ golang.org/x/sync v0.15.0/go.mod
h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
+golang.org/x/term v0.32.0 h1:DR4lr0TjUs3epypdhTOkMmuF5CDFJ/8pOnbzMZPQ7bg=
+golang.org/x/term v0.32.0/go.mod
h1:uZG1FhGx848Sqfsq4/DlJr3xGGsYMu/L5GW4abiaEPQ=
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
golang.org/x/text v0.26.0/go.mod
h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc=
diff --git a/internal/collector/basic/database/jdbc_collector.go
b/internal/collector/basic/database/jdbc_collector.go
index dd74b2f..86f01be 100644
--- a/internal/collector/basic/database/jdbc_collector.go
+++ b/internal/collector/basic/database/jdbc_collector.go
@@ -29,6 +29,7 @@ import (
_ "github.com/lib/pq"
_ "github.com/microsoft/go-mssqldb"
jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
)
@@ -50,10 +51,10 @@ const (
PlatformOracle = "oracle"
// Response codes
- CodeSuccess = 200
- CodeFail = 500
- CodeUnReachable = 503
- CodeUnConnectable = 502
+ CodeSuccess = constants.CollectSuccess
+ CodeFail = constants.CollectFail
+ CodeUnReachable = constants.CollectUnReachable
+ CodeUnConnectable = constants.CollectUnConnectable
)
// JDBCCollector implements JDBC database collection
@@ -178,11 +179,11 @@ func (jc *JDBCCollector) Collect(metrics
*jobtypes.Metrics) *jobtypes.CollectRep
switch jdbcConfig.QueryType {
case QueryTypeOneRow:
- err = jc.queryOneRow(db, jdbcConfig.SQL, metrics.Aliasfields,
response)
+ err = jc.queryOneRow(db, jdbcConfig.SQL, metrics.AliasFields,
response)
case QueryTypeMultiRow:
- err = jc.queryMultiRow(db, jdbcConfig.SQL, metrics.Aliasfields,
response)
+ err = jc.queryMultiRow(db, jdbcConfig.SQL, metrics.AliasFields,
response)
case QueryTypeColumns:
- err = jc.queryColumns(db, jdbcConfig.SQL, metrics.Aliasfields,
response)
+ err = jc.queryColumns(db, jdbcConfig.SQL, metrics.AliasFields,
response)
case QueryTypeRunScript:
err = jc.runScript(db, jdbcConfig.SQL, response)
default:
diff --git a/internal/collector/basic/init.go b/internal/collector/basic/init.go
index f4a7504..68a282e 100644
--- a/internal/collector/basic/init.go
+++ b/internal/collector/basic/init.go
@@ -21,6 +21,7 @@ package basic
import (
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/database"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/ssh"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
@@ -34,6 +35,10 @@ func init() {
return database.NewJDBCCollector(logger)
})
+ strategy.RegisterFactory("ssh", func(logger logger.Logger)
strategy.Collector {
+ return ssh.NewSSHCollector(logger)
+ })
+
// More protocols can be added here in the future:
// strategy.RegisterFactory("http", func(logger logger.Logger)
strategy.Collector {
// return http.NewHTTPCollector(logger)
diff --git a/internal/collector/basic/ssh/.keep
b/internal/collector/basic/ssh/.keep
deleted file mode 100644
index e69de29..0000000
diff --git a/internal/collector/basic/ssh/ssh_collector.go
b/internal/collector/basic/ssh/ssh_collector.go
new file mode 100644
index 0000000..42734e2
--- /dev/null
+++ b/internal/collector/basic/ssh/ssh_collector.go
@@ -0,0 +1,390 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package ssh
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "net"
+ "strconv"
+ "strings"
+ "time"
+
+ sshhelper
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/ssh"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
+)
+
+const (
+ ProtocolSSH = "ssh"
+
+ // Special fields
+ ResponseTime = "responseTime"
+ NullValue = " "
+
+ // Parse types
+ ParseTypeOneRow = "oneRow" // Each line corresponds to one field
value, in order of aliasFields
+ ParseTypeMultiRow = "multiRow" // First line is header with field
names, subsequent lines are data rows
+)
+
+// SSHCollector implements SSH collection
+type SSHCollector struct {
+ logger logger.Logger
+}
+
+// NewSSHCollector creates a new SSH collector
+func NewSSHCollector(logger logger.Logger) *SSHCollector {
+ return &SSHCollector{
+ logger: logger.WithName("ssh-collector"),
+ }
+}
+
+// extractSSHConfig extracts SSH configuration from interface{} type
+// This function uses the parameter replacer for consistent configuration
extraction
+func extractSSHConfig(sshInterface interface{}) (*jobtypes.SSHProtocol, error)
{
+ replacer := param.NewReplacer()
+ return replacer.ExtractSSHConfig(sshInterface)
+}
+
+// PreCheck validates the SSH metrics configuration
+func (jc *SSHCollector) PreCheck(metrics *jobtypes.Metrics) error {
+ if metrics == nil {
+ return fmt.Errorf("metrics is nil")
+ }
+
+ if metrics.SSH == nil {
+ return fmt.Errorf("SSH protocol configuration is required")
+ }
+
+ // Extract SSH configuration
+ sshConfig, err := extractSSHConfig(metrics.SSH)
+ if err != nil {
+ return fmt.Errorf("invalid SSH configuration: %w", err)
+ }
+ if sshConfig == nil {
+ return fmt.Errorf("SSH configuration is required")
+ }
+
+ // Validate required fields
+ if sshConfig.Host == "" {
+ return fmt.Errorf("host is required")
+ }
+ if sshConfig.Port == "" {
+ return fmt.Errorf("port is required")
+ }
+ if sshConfig.Username == "" {
+ return fmt.Errorf("username is required")
+ }
+ // Either password or private key must be provided for authentication
+ if sshConfig.Password == "" && sshConfig.PrivateKey == "" {
+ return fmt.Errorf("either password or privateKey is required
for authentication")
+ }
+ if sshConfig.Script == "" {
+ return fmt.Errorf("script is required")
+ }
+ // parseScript and timeout are optional, will use defaults if not
provided
+
+ return nil
+}
+
+// Collect performs SSH metrics collection
+func (sshc *SSHCollector) Collect(metrics *jobtypes.Metrics)
*jobtypes.CollectRepMetricsData {
+ startTime := time.Now()
+
+ // Extract SSH configuration
+ sshConfig, err := extractSSHConfig(metrics.SSH)
+ if err != nil {
+ sshc.logger.Error(err, "failed to extract SSH config")
+ return sshc.createFailResponse(metrics, consts.CollectFail,
fmt.Sprintf("SSH config error: %v", err))
+ }
+ if sshConfig == nil {
+ return sshc.createFailResponse(metrics, consts.CollectFail,
"SSH configuration is required")
+ }
+
+ // Debug level only for collection start
+ sshc.logger.V(1).Info("starting SSH collection",
+ "host", sshConfig.Host,
+ "port", sshConfig.Port)
+
+ // Get timeout
+ timeout := sshc.getTimeout(sshConfig.Timeout)
+
+ // Execute SSH script
+ result, err := sshc.executeSSHScript(sshConfig, timeout)
+ if err != nil {
+ sshc.logger.Error(err, "failed to execute SSH script")
+ return sshc.createFailResponse(metrics,
consts.CollectUnConnectable, fmt.Sprintf("SSH execution error: %v", err))
+ }
+
+ // Calculate response time
+ responseTime := time.Since(startTime).Milliseconds()
+
+ // Create success response with result
+ response := sshc.createSuccessResponseWithResult(metrics, result,
responseTime)
+
+ return response
+}
+
+// Protocol returns the protocol this collector supports
+func (sshc *SSHCollector) Protocol() string {
+ return ProtocolSSH
+}
+
+// getTimeout parses timeout string and returns duration
+func (sshc *SSHCollector) getTimeout(timeoutStr string) time.Duration {
+ if timeoutStr == "" {
+ return 30 * time.Second // default timeout
+ }
+
+ // First try parsing as duration string (e.g., "10s", "5m", "500ms")
+ if duration, err := time.ParseDuration(timeoutStr); err == nil {
+ return duration
+ }
+
+ if timeout, err := strconv.Atoi(timeoutStr); err == nil {
+ return time.Duration(timeout) * time.Millisecond
+ }
+
+ return 30 * time.Second // fallback to default
+}
+
+// executeSSHScript executes the SSH script and returns the result
+func (sshc *SSHCollector) executeSSHScript(config *jobtypes.SSHProtocol,
timeout time.Duration) (string, error) {
+ // Create SSH client configuration using helper function
+ clientConfig, err := sshhelper.CreateSSHClientConfig(config,
sshc.logger)
+ if err != nil {
+ return "", fmt.Errorf("failed to create SSH client config: %w",
err)
+ }
+
+ // Set timeout for the SSH connection
+ clientConfig.Timeout = timeout
+
+ // Create context with timeout for the entire operation
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+
+ // Use helper function to dial with context (with proxy support)
+ conn, err := sshhelper.DialWithProxy(ctx, config, clientConfig,
sshc.logger)
+ if err != nil {
+ address := net.JoinHostPort(config.Host, config.Port)
+ return "", fmt.Errorf("failed to connect to SSH server %s: %w",
address, err)
+ }
+ defer conn.Close()
+
+ // Create a session
+ session, err := conn.NewSession()
+ if err != nil {
+ return "", fmt.Errorf("failed to create SSH session: %w", err)
+ }
+ defer session.Close()
+
+ // Execute the script and capture output
+ var stdout bytes.Buffer
+ var stderr bytes.Buffer
+ session.Stdout = &stdout
+ session.Stderr = &stderr
+
+ // Execute the command
+ err = session.Run(config.Script)
+ if err != nil {
+ stderrStr := stderr.String()
+ if stderrStr != "" {
+ return "", fmt.Errorf("script execution failed: %w,
stderr: %s", err, stderrStr)
+ }
+ return "", fmt.Errorf("script execution failed: %w", err)
+ }
+ sshc.logger.Info("SSH script executed successfully")
+ sshc.logger.Info("SSH script output", "output", stdout.String())
+
+ return stdout.String(), nil
+}
+
+// createFailResponse creates a failed metrics data response
+func (jc *SSHCollector) createFailResponse(metrics *jobtypes.Metrics, code
int, message string) *jobtypes.CollectRepMetricsData {
+ return &jobtypes.CollectRepMetricsData{
+ ID: 0, // Will be set by the calling context
+ MonitorID: 0, // Will be set by the calling context
+ App: "", // Will be set by the calling context
+ Metrics: metrics.Name,
+ Priority: 0,
+ Time: time.Now().UnixMilli(),
+ Code: code,
+ Msg: message,
+ Fields: make([]jobtypes.Field, 0),
+ Values: make([]jobtypes.ValueRow, 0),
+ Labels: make(map[string]string),
+ Metadata: make(map[string]string),
+ }
+}
+
+// createSuccessResponseWithResult creates a successful metrics data response
with execution result
+func (sshc *SSHCollector) createSuccessResponseWithResult(metrics
*jobtypes.Metrics, result string, responseTime int64)
*jobtypes.CollectRepMetricsData {
+ response := &jobtypes.CollectRepMetricsData{
+ ID: 0, // Will be set by the calling context
+ MonitorID: 0, // Will be set by the calling context
+ App: "", // Will be set by the calling context
+ Metrics: metrics.Name,
+ Priority: 0,
+ Time: time.Now().UnixMilli(),
+ // FIXME: use consts.CollectSuccess
+ Code: 200, // Success
+ Msg: "success",
+ Fields: make([]jobtypes.Field, 0),
+ Values: make([]jobtypes.ValueRow, 0),
+ Labels: make(map[string]string),
+ Metadata: make(map[string]string),
+ }
+
+ // Extract SSH configuration to get ParseType
+ sshConfig, err := extractSSHConfig(metrics.SSH)
+ if err != nil {
+ sshc.logger.Error(err, "failed to extract SSH config for
parsing")
+ return response
+ }
+
+ // Parse response data based on ParseType
+ if strings.EqualFold(sshConfig.ParseType, ParseTypeOneRow) {
+ sshc.parseResponseDataByOne(result, metrics.AliasFields,
response, responseTime)
+ } else if strings.EqualFold(sshConfig.ParseType, ParseTypeMultiRow) {
+ // Default to multi-row format
+ sshc.parseResponseDataByMulti(result, metrics.AliasFields,
response, responseTime)
+ }
+
+ return response
+}
+
+// parseResponseDataByOne parses SSH script output in single-row format
+// Each line contains a single field value, matching the order of aliasFields
+func (sshc *SSHCollector) parseResponseDataByOne(result string, aliasFields
[]string, response *jobtypes.CollectRepMetricsData, responseTime int64) {
+ lines := strings.Split(result, "\n")
+ if len(lines)+1 < len(aliasFields) {
+ sshc.logger.Error(fmt.Errorf("ssh response data not enough"),
"result", result)
+ return
+ }
+
+ valueRow := jobtypes.ValueRow{
+ Columns: make([]string, 0),
+ }
+
+ aliasIndex := 0
+ lineIndex := 0
+
+ for aliasIndex < len(aliasFields) {
+ alias := aliasFields[aliasIndex]
+
+ if strings.EqualFold(alias, ResponseTime) {
+ // Add response time for special field
+ valueRow.Columns = append(valueRow.Columns,
fmt.Sprintf("%d", responseTime))
+ } else {
+ if lineIndex < len(lines) {
+ valueRow.Columns = append(valueRow.Columns,
strings.TrimSpace(lines[lineIndex]))
+ } else {
+ // Add null value if line not available
+ valueRow.Columns = append(valueRow.Columns,
NullValue)
+ }
+ lineIndex++
+ }
+ aliasIndex++
+ }
+
+ response.Values = append(response.Values, valueRow)
+
+ sshc.logger.Info("Parsed SSH script data (OneRow format)",
"aliasFields", aliasFields, "lineCount", len(lines))
+
+ // Set Fields based on aliasFields
+ for _, alias := range aliasFields {
+ field := jobtypes.Field{
+ Field: alias,
+ Type: 1, // Default type
+ Label: false,
+ Unit: "",
+ }
+ response.Fields = append(response.Fields, field)
+ }
+}
+
+// parseResponseDataByMulti parses SSH script output in multi-row format
+// First line contains field names, subsequent lines contain data
+func (sshc *SSHCollector) parseResponseDataByMulti(result string, aliasFields
[]string, response *jobtypes.CollectRepMetricsData, responseTime int64) {
+ lines := strings.Split(result, "\n")
+ if len(lines) <= 1 {
+ sshc.logger.Error(fmt.Errorf("ssh response data only has
header"), "result", result)
+ return
+ }
+
+ // Parse the first line as field names
+ fields := strings.Fields(lines[0]) // Use Fields instead of Split to
handle multiple spaces
+ fieldMapping := make(map[string]int, len(fields))
+ for i, field := range fields {
+ fieldMapping[strings.ToLower(strings.TrimSpace(field))] = i
+ }
+
+ sshc.logger.Info("Parsed SSH script header fields", "fields", fields,
"fieldMapping", fieldMapping)
+
+ // Parse subsequent lines as data rows
+ dataRowCount := 0
+ for i := 1; i < len(lines); i++ {
+ line := strings.TrimSpace(lines[i])
+ if line == "" {
+ continue // Skip empty lines
+ }
+
+ values := strings.Fields(line) // Use Fields to handle multiple
spaces
+ valueRow := jobtypes.ValueRow{
+ Columns: make([]string, 0),
+ }
+
+ // Build columns based on aliasFields order
+ for _, alias := range aliasFields {
+ if strings.EqualFold(alias, ResponseTime) {
+ // Add response time for special field
+ valueRow.Columns = append(valueRow.Columns,
fmt.Sprintf("%d", responseTime))
+ } else {
+ // Find the field index in the mapping
+ index, exists :=
fieldMapping[strings.ToLower(alias)]
+ if exists && index < len(values) {
+ valueRow.Columns =
append(valueRow.Columns, values[index])
+ } else {
+ // Add null value if field not found or
index out of range
+ valueRow.Columns =
append(valueRow.Columns, NullValue)
+ sshc.logger.V(1).Info("Field not found
or index out of range",
+ "alias", alias, "exists",
exists, "index", index, "valuesLength", len(values))
+ }
+ }
+ }
+
+ response.Values = append(response.Values, valueRow)
+ dataRowCount++
+ }
+
+ sshc.logger.Info("Parsed SSH script data", "dataRows", dataRowCount,
"aliasFields", aliasFields)
+
+ // Set Fields based on aliasFields
+ for _, alias := range aliasFields {
+ field := jobtypes.Field{
+ Field: alias,
+ Type: 1, // Default type
+ Label: false,
+ Unit: "",
+ }
+ response.Fields = append(response.Fields, field)
+ }
+}
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go
b/internal/collector/common/collect/dispatch/metrics_collector.go
index 0795a8a..1f46485 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -21,7 +21,6 @@ package dispatch
import (
"fmt"
- "net/http"
"time"
// Import basic package with blank identifier to trigger its init()
function
@@ -29,6 +28,7 @@ import (
_ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
@@ -67,7 +67,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics
*jobtypes.Metrics, job *jobty
"protocol", metrics.Protocol,
"metricsName", metrics.Name)
- result := mc.createErrorResponse(metrics, job,
http.StatusInternalServerError, fmt.Sprintf("Collector not found: %v", err))
+ result := mc.createErrorResponse(metrics, job,
constants.CollectUnavailable, fmt.Sprintf("Collector not found: %v", err))
resultChan <- result
return
}
@@ -102,7 +102,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics
*jobtypes.Metrics, job *jobty
duration := time.Since(startTime)
// Only log failures at INFO level, success at debug level
- if result != nil && result.Code == http.StatusOK {
+ if result != nil && result.Code == constants.CollectSuccess {
mc.logger.V(1).Info("metrics collection completed
successfully",
"metricsName", metrics.Name,
"protocol", metrics.Protocol,
diff --git a/internal/collector/common/collect/lazy_message_router.go
b/internal/collector/common/collect/lazy_message_router.go
index caff0ec..3ad76e1 100644
--- a/internal/collector/common/collect/lazy_message_router.go
+++ b/internal/collector/common/collect/lazy_message_router.go
@@ -183,26 +183,6 @@ func (l *LazyMessageRouter) serializeToArrow(dataList
[]*jobtypes.CollectRepMetr
// createArrowRecordBatch creates Arrow RecordBatch for single MetricsData,
compatible with Java Manager
func (l *LazyMessageRouter) createArrowRecordBatch(mem memory.Allocator, data
*jobtypes.CollectRepMetricsData) (arrow.Record, error) {
- // Create metadata for fields (all fields use the same default metadata)
- emptyMetadata := arrow.MetadataFrom(map[string]string{
- "type": "1",
- "label": "false",
- "unit": "none",
- })
-
- // Define metadata fields (fixed fields)
- metadataFields := []arrow.Field{
- {Name: "app", Type: arrow.BinaryTypes.String, Nullable: true,
Metadata: emptyMetadata},
- {Name: "metrics", Type: arrow.BinaryTypes.String, Nullable:
true, Metadata: emptyMetadata},
- {Name: "id", Type: arrow.BinaryTypes.String, Nullable: true,
Metadata: emptyMetadata},
- {Name: "monitorId", Type: arrow.BinaryTypes.String, Nullable:
true, Metadata: emptyMetadata},
- {Name: "tenantId", Type: arrow.BinaryTypes.String, Nullable:
true, Metadata: emptyMetadata},
- {Name: "priority", Type: arrow.BinaryTypes.String, Nullable:
true, Metadata: emptyMetadata},
- {Name: "time", Type: arrow.BinaryTypes.String, Nullable: true,
Metadata: emptyMetadata},
- {Name: "code", Type: arrow.BinaryTypes.String, Nullable: true,
Metadata: emptyMetadata},
- {Name: "msg", Type: arrow.BinaryTypes.String, Nullable: true,
Metadata: emptyMetadata},
- }
-
// Add dynamic fields (based on collected field definitions)
dynamicFields := make([]arrow.Field, 0, len(data.Fields))
for _, field := range data.Fields {
@@ -230,9 +210,8 @@ func (l *LazyMessageRouter) createArrowRecordBatch(mem
memory.Allocator, data *j
})
}
- // Merge all fields
- allFields := make([]arrow.Field, 0,
len(metadataFields)+len(dynamicFields))
- allFields = append(allFields, metadataFields...)
+ // Merge dynamic fields only, as metadata fields are not needed in
final schema
+ allFields := make([]arrow.Field, 0, len(dynamicFields))
allFields = append(allFields, dynamicFields...)
// Create schema-level metadata
@@ -268,33 +247,20 @@ func (l *LazyMessageRouter) createArrowRecordBatch(mem
memory.Allocator, data *j
// Fill data
for rowIdx := 0; rowIdx < rowCount; rowIdx++ {
- // Fill metadata fields
- builders[0].(*array.StringBuilder).Append(data.App)
- builders[1].(*array.StringBuilder).Append(data.Metrics)
- builders[2].(*array.StringBuilder).Append(fmt.Sprintf("%d",
data.ID))
- builders[3].(*array.StringBuilder).Append(fmt.Sprintf("%d",
data.MonitorID))
- builders[4].(*array.StringBuilder).Append(fmt.Sprintf("%d",
data.TenantID))
- builders[5].(*array.StringBuilder).Append(fmt.Sprintf("%d",
data.Priority))
- builders[6].(*array.StringBuilder).Append(fmt.Sprintf("%d",
data.Time))
- builders[7].(*array.StringBuilder).Append(fmt.Sprintf("%d",
data.Code))
- builders[8].(*array.StringBuilder).Append(data.Msg)
-
// Fill dynamic field data
if rowIdx < len(data.Values) {
valueRow := data.Values[rowIdx]
- for i, _ := range data.Fields {
- builderIdx := len(metadataFields) + i
+ for i := range data.Fields {
var value string
if i < len(valueRow.Columns) {
value = valueRow.Columns[i]
}
-
builders[builderIdx].(*array.StringBuilder).Append(value)
+ builders[i].(*array.StringBuilder).Append(value)
}
} else {
// Fill empty values
for i := range data.Fields {
- builderIdx := len(metadataFields) + i
-
builders[builderIdx].(*array.StringBuilder).Append("")
+ builders[i].(*array.StringBuilder).Append("")
}
}
}
diff --git a/internal/collector/common/collect/result_handler.go
b/internal/collector/common/collect/result_handler.go
index 48cc240..cd16820 100644
--- a/internal/collector/common/collect/result_handler.go
+++ b/internal/collector/common/collect/result_handler.go
@@ -21,9 +21,9 @@ package collect
import (
"fmt"
- "net/http"
jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
@@ -69,7 +69,7 @@ func (rh *ResultHandlerImpl) HandleCollectData(data
*jobtypes.CollectRepMetricsD
}
// Log successful result sending
- if data.Code == http.StatusOK {
+ if data.Code == constants.CollectSuccess {
rh.logger.V(1).Info("successfully sent collection
result to Manager",
"metricsName", data.Metrics)
} else {
diff --git a/internal/collector/common/dispatcher/common_dispatcher.go
b/internal/collector/common/dispatcher/common_dispatcher.go
index 8d37be5..288e0c1 100644
--- a/internal/collector/common/dispatcher/common_dispatcher.go
+++ b/internal/collector/common/dispatcher/common_dispatcher.go
@@ -68,6 +68,7 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx
context.Context, job *jo
startTime := time.Now()
+ job.ConstructPriorMetrics()
// Get the next metrics to collect based on job priority and
dependencies
metricsToCollect := job.NextCollectMetrics()
if len(metricsToCollect) == 0 {
diff --git a/internal/collector/common/ssh/.keep
b/internal/collector/common/ssh/.keep
deleted file mode 100644
index e69de29..0000000
diff --git a/internal/collector/common/ssh/ssh_helper.go
b/internal/collector/common/ssh/ssh_helper.go
new file mode 100644
index 0000000..fbecb76
--- /dev/null
+++ b/internal/collector/common/ssh/ssh_helper.go
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package ssh
+
+import (
+ "context"
+ "crypto/x509"
+ "encoding/pem"
+ "fmt"
+ "net"
+ "strings"
+
+ "golang.org/x/crypto/ssh"
+
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// CreateSSHClientConfig creates SSH client configuration based on the
protocol config
+func CreateSSHClientConfig(config *jobtypes.SSHProtocol, logger logger.Logger)
(*ssh.ClientConfig, error) {
+ clientConfig := &ssh.ClientConfig{
+ User: config.Username,
+ HostKeyCallback: ssh.InsecureIgnoreHostKey(), // Note: In
production, you should verify host keys
+ }
+
+ // Configure authentication
+ if config.PrivateKey != "" && strings.TrimSpace(config.PrivateKey) !=
"" {
+ logger.Info("Using private key authentication")
+ // Use private key authentication
+ signer, err := ParsePrivateKey(config.PrivateKey)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse private key:
%w", err)
+ }
+ clientConfig.Auth = []ssh.AuthMethod{ssh.PublicKeys(signer)}
+ } else if config.Password != "" {
+ logger.Info("Using password authentication")
+ // Use password authentication
+ clientConfig.Auth =
[]ssh.AuthMethod{ssh.Password(config.Password)}
+ } else {
+ return nil, fmt.Errorf("either password or private key must be
provided")
+ }
+
+ return clientConfig, nil
+}
+
+// ParsePrivateKey parses a private key string and returns an ssh.Signer
+func ParsePrivateKey(privateKeyStr string) (ssh.Signer, error) {
+ // Decode the PEM block
+ block, _ := pem.Decode([]byte(privateKeyStr))
+ if block == nil {
+ return nil, fmt.Errorf("failed to decode PEM block from private
key")
+ }
+
+ // Parse the private key
+ var privateKey interface{}
+ var err error
+
+ switch block.Type {
+ case "RSA PRIVATE KEY":
+ privateKey, err = x509.ParsePKCS1PrivateKey(block.Bytes)
+ case "PRIVATE KEY":
+ privateKey, err = x509.ParsePKCS8PrivateKey(block.Bytes)
+ case "EC PRIVATE KEY":
+ privateKey, err = x509.ParseECPrivateKey(block.Bytes)
+ case "OPENSSH PRIVATE KEY":
+ // Use ssh.ParseRawPrivateKey for OpenSSH format
+ privateKey, err = ssh.ParseRawPrivateKey([]byte(privateKeyStr))
+ default:
+ return nil, fmt.Errorf("unsupported private key type: %s",
block.Type)
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse private key: %w", err)
+ }
+
+ // Create SSH signer
+ signer, err := ssh.NewSignerFromKey(privateKey)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create signer from private
key: %w", err)
+ }
+
+ return signer, nil
+}
+
+// createProxyClientConfig creates SSH client configuration for proxy server
+func createProxyClientConfig(config *jobtypes.SSHProtocol, logger
logger.Logger) (*ssh.ClientConfig, error) {
+ clientConfig := &ssh.ClientConfig{
+ User: config.ProxyUsername,
+ HostKeyCallback: ssh.InsecureIgnoreHostKey(),
+ }
+
+ // Configure authentication for proxy
+ if config.ProxyPrivateKey != "" &&
strings.TrimSpace(config.ProxyPrivateKey) != "" {
+ logger.Info("Using proxy private key authentication")
+ // Use private key authentication for proxy
+ signer, err := ParsePrivateKey(config.ProxyPrivateKey)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse proxy private
key: %w", err)
+ }
+ clientConfig.Auth = []ssh.AuthMethod{ssh.PublicKeys(signer)}
+ } else if config.ProxyPassword != "" {
+ logger.Info("Using proxy password authentication")
+ // Use password authentication for proxy
+ clientConfig.Auth =
[]ssh.AuthMethod{ssh.Password(config.ProxyPassword)}
+ } else {
+ return nil, fmt.Errorf("either proxy password or proxy private
key must be provided")
+ }
+
+ return clientConfig, nil
+}
+
+// dialViaProxy establishes SSH connection through a proxy server
+func dialViaProxy(ctx context.Context, config *jobtypes.SSHProtocol,
targetConfig *ssh.ClientConfig, logger logger.Logger) (*ssh.Client, error) {
+ // Create proxy client configuration
+ proxyConfig, err := createProxyClientConfig(config, logger)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create proxy client config:
%w", err)
+ }
+
+ // Set timeout for proxy connection
+ if timeout := targetConfig.Timeout; timeout > 0 {
+ proxyConfig.Timeout = timeout
+ }
+
+ // Connect to proxy server
+ proxyAddress := net.JoinHostPort(config.ProxyHost, config.ProxyPort)
+ logger.Info("Connecting to proxy server", "address", proxyAddress)
+
+ // Create dialer with context for proxy connection
+ dialer := &net.Dialer{}
+ proxyConn, err := dialer.DialContext(ctx, "tcp", proxyAddress)
+ if err != nil {
+ return nil, fmt.Errorf("failed to connect to proxy server %s:
%w", proxyAddress, err)
+ }
+
+ // Create SSH client connection to proxy
+ proxyClientConn, proxyChans, proxyReqs, err :=
ssh.NewClientConn(proxyConn, proxyAddress, proxyConfig)
+ if err != nil {
+ proxyConn.Close()
+ return nil, fmt.Errorf("failed to establish SSH connection to
proxy: %w", err)
+ }
+ proxyClient := ssh.NewClient(proxyClientConn, proxyChans, proxyReqs)
+
+ // Create connection to target server through proxy
+ targetAddress := net.JoinHostPort(config.Host, config.Port)
+ logger.Info("Connecting to target server via proxy", "target",
targetAddress)
+
+ targetConn, err := proxyClient.Dial("tcp", targetAddress)
+ if err != nil {
+ proxyClient.Close()
+ return nil, fmt.Errorf("failed to dial target server %s via
proxy: %w", targetAddress, err)
+ }
+
+ // Create SSH client connection to target server
+ targetClientConn, targetChans, targetReqs, err :=
ssh.NewClientConn(targetConn, targetAddress, targetConfig)
+ if err != nil {
+ targetConn.Close()
+ proxyClient.Close()
+ return nil, fmt.Errorf("failed to establish SSH connection to
target server: %w", err)
+ }
+
+ // Note: We need to keep the proxy client alive, so we don't close it
here
+ // The target client will handle the connection lifecycle
+ return ssh.NewClient(targetClientConn, targetChans, targetReqs), nil
+}
+
+// DialWithContext dials SSH connection with context support
+func DialWithContext(ctx context.Context, network, addr string, config
*ssh.ClientConfig) (*ssh.Client, error) {
+ // Create a dialer with context
+ dialer := &net.Dialer{}
+ conn, err := dialer.DialContext(ctx, network, addr)
+ if err != nil {
+ return nil, err
+ }
+
+ // Create SSH client connection
+ c, chans, reqs, err := ssh.NewClientConn(conn, addr, config)
+ if err != nil {
+ conn.Close()
+ return nil, err
+ }
+
+ return ssh.NewClient(c, chans, reqs), nil
+}
+
+// DialWithProxy dials SSH connection through proxy with context support
+func DialWithProxy(ctx context.Context, sshConfig *jobtypes.SSHProtocol,
clientConfig *ssh.ClientConfig, logger logger.Logger) (*ssh.Client, error) {
+ // TODO: implement Reuse connection logic
+
+ // Check if proxy is enabled and configured
+ if sshConfig.UseProxy != "true" || sshConfig.ProxyHost == "" ||
sshConfig.ProxyPort == "" {
+ // Use direct connection if proxy is not enabled or not
configured
+ address := net.JoinHostPort(sshConfig.Host, sshConfig.Port)
+ return DialWithContext(ctx, "tcp", address, clientConfig)
+ }
+
+ // Validate proxy configuration
+ if sshConfig.ProxyUsername == "" {
+ return nil, fmt.Errorf("proxy username is required when using
proxy")
+ }
+ if sshConfig.ProxyPassword == "" && sshConfig.ProxyPrivateKey == "" {
+ return nil, fmt.Errorf("either proxy password or proxy private
key is required when using proxy")
+ }
+
+ logger.Info("Using proxy connection", "proxyHost", sshConfig.ProxyHost,
"proxyPort", sshConfig.ProxyPort)
+ return dialViaProxy(ctx, sshConfig, clientConfig, logger)
+}
diff --git a/internal/collector/common/types/job/job_types.go
b/internal/collector/common/types/job/job_types.go
index 70c3c10..aca80cd 100644
--- a/internal/collector/common/types/job/job_types.go
+++ b/internal/collector/common/types/job/job_types.go
@@ -17,6 +17,14 @@
package job
+import (
+ "log"
+ "math"
+ "sort"
+ "sync"
+ "time"
+)
+
// hertzbeat Collect Job related types
// Job represents a complete monitoring job
@@ -49,8 +57,10 @@ type Job struct {
// Internal fields
EnvConfigmaps map[string]Configmap `json:"-"`
DispatchTime int64 `json:"-"`
- PriorMetrics []Metrics `json:"-"`
+ PriorMetrics [][]*Metrics `json:"-"` // LinkedList of
Set<Metrics>
ResponseDataTemp []MetricsData `json:"-"`
+
+ mu sync.Mutex // Mutex for thread-safe operations
}
// NextCollectMetrics returns the metrics that should be collected next
@@ -65,7 +75,85 @@ func (j *Job) NextCollectMetrics() []*Metrics {
}
// ConstructPriorMetrics prepares prior metrics for collection dependencies
+// This method filters metrics that need to be collected based on time,
+// sets default values, groups by priority, and constructs an ordered
collection queue
func (j *Job) ConstructPriorMetrics() {
- j.PriorMetrics = make([]Metrics, len(j.Metrics))
- copy(j.PriorMetrics, j.Metrics)
+ j.mu.Lock()
+ defer j.mu.Unlock()
+
+ now := time.Now().UnixMilli()
+
+ // Filter metrics that need to be collected and set default values
+ currentCollectMetrics := make(map[int][]*Metrics)
+
+ for i := range j.Metrics {
+ metric := &j.Metrics[i]
+
+ // Check if it's time to collect this metric
+ if now >= metric.CollectTime+metric.Interval*1000 {
+ // Update collect time
+ metric.CollectTime = now
+
+ // Set default AliasFields if not configured
+ if len(metric.AliasFields) == 0 && len(metric.Fields) >
0 {
+ aliasFields := make([]string,
len(metric.Fields))
+ for idx, field := range metric.Fields {
+ aliasFields[idx] = field.Field
+ }
+ metric.AliasFields = aliasFields
+ }
+
+ // Group by priority
+ priority := metric.Priority
+ currentCollectMetrics[priority] =
append(currentCollectMetrics[priority], metric)
+ }
+ }
+
+ // If no metrics need to be collected, add the default availability
metric (priority 0)
+ if len(currentCollectMetrics) == 0 {
+ for i := range j.Metrics {
+ metric := &j.Metrics[i]
+ if metric.Priority == 0 {
+ metric.CollectTime = now
+ currentCollectMetrics[0] = []*Metrics{metric}
+ break
+ }
+ }
+
+ // If still empty, log error
+ if len(currentCollectMetrics) == 0 {
+ log.Println("ERROR: metrics must has one priority 0
metrics at least.")
+ }
+ }
+
+ // Construct a linked list (slice) of task execution order of the
metrics
+ // Each element is a set (slice) of metrics with the same priority
+ j.PriorMetrics = make([][]*Metrics, 0, len(currentCollectMetrics))
+
+ for _, metricsSet := range currentCollectMetrics {
+ // Make a copy to ensure thread safety
+ metricsCopy := make([]*Metrics, len(metricsSet))
+ copy(metricsCopy, metricsSet)
+ j.PriorMetrics = append(j.PriorMetrics, metricsCopy)
+ }
+
+ // Sort by priority (ascending order)
+ sort.Slice(j.PriorMetrics, func(i, k int) bool {
+ // Get priority from the first metric in each set
+ var iPriority, kPriority int = math.MaxInt, math.MaxInt
+
+ if len(j.PriorMetrics[i]) > 0 && j.PriorMetrics[i][0] != nil {
+ iPriority = j.PriorMetrics[i][0].Priority
+ }
+ if len(j.PriorMetrics[k]) > 0 && j.PriorMetrics[k][0] != nil {
+ kPriority = j.PriorMetrics[k][0].Priority
+ }
+
+ return iPriority < kPriority
+ })
+
+ // Initialize EnvConfigmaps
+ if j.EnvConfigmaps == nil {
+ j.EnvConfigmaps = make(map[string]Configmap, 8)
+ }
}
diff --git a/internal/collector/common/types/job/metrics_types.go
b/internal/collector/common/types/job/metrics_types.go
index 04f035e..ef88be8 100644
--- a/internal/collector/common/types/job/metrics_types.go
+++ b/internal/collector/common/types/job/metrics_types.go
@@ -31,9 +31,8 @@ type Metrics struct {
Interval int64 `json:"interval"`
Visible *bool `json:"visible"`
Fields []Field `json:"fields"`
- Aliasfields []string `json:"aliasfields"`
- AliasFields []string `json:"aliasFields"` // Alternative field
name
- Calculates interface{} `json:"calculates"` // Can be
[]Calculate or []string
+ AliasFields []string `json:"aliasFields"`
+ Calculates interface{} `json:"calculates"` // Can be []Calculate
or []string
Filters interface{} `json:"filters"`
Units interface{} `json:"units"` // Can be []Unit or
[]string
Protocol string `json:"protocol"`
@@ -46,7 +45,7 @@ type Metrics struct {
// Protocol specific fields
HTTP *HTTPProtocol `json:"http,omitempty"`
- SSH *SSHProtocol `json:"ssh,omitempty"`
+ SSH interface{} `json:"ssh,omitempty"` // Can be SSHProtocol
JDBC interface{} `json:"jdbc,omitempty"` // Can be JDBCProtocol
or map[string]interface{}
SNMP *SNMPProtocol `json:"snmp,omitempty"`
JMX *JMXProtocol `json:"jmx,omitempty"`
@@ -185,14 +184,23 @@ type HTTPProtocol struct {
// SSHProtocol represents SSH protocol configuration
type SSHProtocol struct {
- Host string `json:"host"`
- Port int `json:"port"`
- Username string `json:"username"`
- Password string `json:"password"`
- PrivateKey string `json:"privateKey"`
- Script string `json:"script"`
- ParseScript string `json:"parseScript"`
- Timeout int `json:"timeout"`
+ Host string `json:"host"`
+ Port string `json:"port"`
+ Username string `json:"username"`
+ Password string `json:"password"`
+ PrivateKey string `json:"privateKey"`
+ PrivateKeyPassphrase string `json:"privateKeyPassphrase"`
+ Script string `json:"script"`
+ ParseType string `json:"parseType"`
+ ParseScript string `json:"parseScript"`
+ Timeout string `json:"timeout"`
+ ReuseConnection string `json:"reuseConnection"`
+ UseProxy string `json:"useProxy"`
+ ProxyHost string `json:"proxyHost"`
+ ProxyPort string `json:"proxyPort"`
+ ProxyUsername string `json:"proxyUsername"`
+ ProxyPassword string `json:"proxyPassword"`
+ ProxyPrivateKey string `json:"proxyPrivateKey"`
}
// JDBCProtocol represents JDBC protocol configuration
@@ -328,12 +336,6 @@ func (j *Job) Clone() *Job {
copy(clone.Metrics[i].Fields, metric.Fields)
}
- // Deep copy Aliasfields slice
- if metric.Aliasfields != nil {
- clone.Metrics[i].Aliasfields = make([]string,
len(metric.Aliasfields))
- copy(clone.Metrics[i].Aliasfields,
metric.Aliasfields)
- }
-
// Deep copy AliasFields slice
if metric.AliasFields != nil {
clone.Metrics[i].AliasFields = make([]string,
len(metric.AliasFields))
diff --git a/internal/constants/const.go b/internal/constants/const.go
index 94f87d8..1376097 100644
--- a/internal/constants/const.go
+++ b/internal/constants/const.go
@@ -42,3 +42,13 @@ const (
const (
CollectorModule = "collector"
)
+
+// Collect Response status code
+const (
+ CollectSuccess = 0
+ CollectUnavailable = 1
+ CollectUnReachable = 2
+ CollectUnConnectable = 3
+ CollectFail = 4
+ CollectTimeout = 5
+)
diff --git a/internal/util/param/param_replacer.go
b/internal/util/param/param_replacer.go
index ff6a944..1b23781 100644
--- a/internal/util/param/param_replacer.go
+++ b/internal/util/param/param_replacer.go
@@ -188,6 +188,12 @@ func (r *Replacer) replaceMetricsParams(metrics
*jobtypes.Metrics, paramMap map[
// For now, other protocols (HTTP, SSH, etc.) are concrete struct
pointers and
// don't require parameter replacement
+ if metrics.SSH != nil {
+ if err := r.replaceProtocolParams(&metrics.SSH, paramMap); err
!= nil {
+ return fmt.Errorf("failed to replace SSH params: %w",
err)
+ }
+ }
+
// Replace parameters in basic metrics fields
if err := r.replaceBasicMetricsParams(metrics, paramMap); err != nil {
return fmt.Errorf("failed to replace basic metrics params: %w",
err)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]