This is an automated email from the ASF dual-hosted git repository.
shown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to refs/heads/main by this push:
new ed20c5c feat: enhance JDBC collector with SSH tunnel support and
security val… (#27)
ed20c5c is described below
commit ed20c5ce69dd38a0405ea8e13b86e665bc7c8bd8
Author: Logic <[email protected]>
AuthorDate: Tue Nov 11 20:05:37 2025 +0800
feat: enhance JDBC collector with SSH tunnel support and security val… (#27)
---
go.mod | 1 +
go.sum | 2 +
.../collector/basic/database/jdbc_collector.go | 864 ++++++++++++++++-----
internal/collector/basic/ssh/ssh_collector.go | 2 +-
.../common/collect/dispatch/metrics_collector.go | 16 +-
.../collect/dispatch/metrics_collector_test.go | 96 +--
.../collector/common/types/job/metrics_types.go | 24 +-
internal/constants/const.go | 106 ++-
8 files changed, 814 insertions(+), 297 deletions(-)
diff --git a/go.mod b/go.mod
index 560318f..b924b9a 100644
--- a/go.mod
+++ b/go.mod
@@ -11,6 +11,7 @@ require (
github.com/lib/pq v1.10.9
github.com/microsoft/go-mssqldb v1.9.3
github.com/prometheus/client_golang v1.23.0
+ github.com/sijms/go-ora/v2 v2.9.0
github.com/spf13/cobra v1.10.1
github.com/stretchr/testify v1.10.0
go.uber.org/zap v1.27.0
diff --git a/go.sum b/go.sum
index 2587cf1..b54c994 100644
--- a/go.sum
+++ b/go.sum
@@ -82,6 +82,8 @@ github.com/prometheus/procfs v0.16.1/go.mod
h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlT
github.com/rogpeppe/go-internal v1.13.1
h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod
h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/russross/blackfriday/v2 v2.1.0/go.mod
h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+github.com/sijms/go-ora/v2 v2.9.0
h1:+iQbUeTeCOFMb5BsOMgUhV8KWyrv9yjKpcK4x7+MFrg=
+github.com/sijms/go-ora/v2 v2.9.0/go.mod
h1:QgFInVi3ZWyqAiJwzBQA+nbKYKH77tdp1PYoCqhR2dU=
github.com/spf13/cobra v1.10.1 h1:lJeBwCfmrnXthfAupyUTzJ/J4Nc1RsHC/mSRU2dll/s=
github.com/spf13/cobra v1.10.1/go.mod
h1:7SmJGaTHFVBY0jW4NXGluQoLvhqFQM+6XSKD+P4XaB0=
github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY=
diff --git a/internal/collector/basic/database/jdbc_collector.go
b/internal/collector/basic/database/jdbc_collector.go
index 86f01be..f91d79d 100644
--- a/internal/collector/basic/database/jdbc_collector.go
+++ b/internal/collector/basic/database/jdbc_collector.go
@@ -1,33 +1,44 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package database implements JDBC (sql.DB) based database collection.
package database
import (
"context"
"database/sql"
"fmt"
+ "io"
+ "log"
+ "net"
+ "net/url"
+ "regexp"
"strconv"
"strings"
+ "sync"
"time"
_ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq"
+ pq "github.com/lib/pq"
_ "github.com/microsoft/go-mssqldb"
+ _ "github.com/sijms/go-ora/v2" // Oracle driver
+ "golang.org/x/crypto/ssh"
+
jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
@@ -49,14 +60,245 @@ const (
PlatformPostgreSQL = "postgresql"
PlatformSQLServer = "sqlserver"
PlatformOracle = "oracle"
+ PlatformClickHouse = "clickhouse" // Placeholder, Go driver DSN TBD
+ PlatformDM = "dm" // Placeholder, Go driver DSN TBD
// Response codes
CodeSuccess = constants.CollectSuccess
CodeFail = constants.CollectFail
CodeUnReachable = constants.CollectUnReachable
CodeUnConnectable = constants.CollectUnConnectable
+
+ // PostgreSQL specific SQLState for unreachable
+ // See:
https://www.postgresql.org/docs/current/errcodes-appendix.html#ERRCODES-TABLE
+ pqUnreachableCode = "08001" //
SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION
)
+// --- Security Check Constants ---
+// Ported from Java version (JdbcCommonCollect)
+
+var (
+ // VULNERABLE_KEYWORDS Dangerous parameters prohibited in the URL
+ vulnerableKeywords = []string{"allowLoadLocalInfile",
"allowLoadLocalInfileInPath", "useLocalInfile"}
+
+ // BLACK_LIST Dangerous keywords prohibited in the URL
+ blackList = []string{
+ // Dangerous SQL commands
+ "create trigger", "create alias", "runscript from", "shutdown",
"drop table",
+ "drop database", "create function", "alter system", "grant
all", "revoke all",
+
+ // File IO
+ "allowloadlocalinfile", "allowloadlocalinfileinpath",
"uselocalinfile",
+
+ // Code execution
+ "init=", "javaobjectserializer=", "runscript",
"serverstatusdiffinterceptor",
+ "queryinterceptors=", "statementinterceptors=",
"exceptioninterceptors=",
+ "xp_cmdshell", "create function", "dbms_java", "sp_sysexecute",
"load_file",
+
+ // Multiple statements
+ "allowmultiqueries",
+
+ // Deserialization
+ "autodeserialize", "detectcustomcollations",
+ }
+
+ // universalBypassPatterns Universal bypass patterns (simplified
version, focusing on whitespace bypass)
+ universalBypassPatterns = []string{
+ `(?i)create\s+trigger`,
+ `(?i)create\s+function`,
+ `(?i)drop\s+table`,
+ `(?i)drop\s+database`,
+ `(?i)run\s+script`,
+ `(?i)alter\s+system`,
+ `(?i)grant\s+all`,
+ `(?i)revoke\s+all`,
+ `(?i)xp\s*cmdshell`,
+ `(?i)load\s+file`,
+ }
+
+ // platformBypassPatterns Platform-specific bypass patterns (simplified
version)
+ platformBypassPatterns = map[string][]string{
+ "mysql": {
+ `(?i)allow\s+load\s+local\s+infile`,
+ `(?i)allow\s+multi\s+queries`,
+ `(?i)query\s+interceptors`,
+ `(?i)auto\s+deserialize`,
+ },
+ "mariadb": {
+ `(?i)allow\s+load\s+local\s+infile`,
+ `(?i)allow\s+multi\s+queries`,
+ `(?i)query\s+interceptors`,
+ `(?i)auto\s+deserialize`,
+ },
+ "postgresql": {
+ `(?i)socket\s+factory`,
+ `(?i)logger\s+file`,
+ `(?i)logger\s+level`,
+ },
+ }
+
+ // Compiled regular expressions
+ compiledUniversalBypassPatterns []*regexp.Regexp
+ compiledPlatformBypassPatterns map[string][]*regexp.Regexp
+
+ // Used to clean invalid characters from URL
+ invalidCharRegex = regexp.MustCompile(`[\x00-\x1F\x7F\xA0]`)
+
+ // Ensures regex patterns are compiled only once
+ compilePatternsOnce sync.Once
+)
+
+// initPatterns compiles all security check regular expressions
+func initPatterns() {
+ compiledUniversalBypassPatterns = make([]*regexp.Regexp,
len(universalBypassPatterns))
+ for i, pattern := range universalBypassPatterns {
+ compiled, err := regexp.Compile(pattern)
+ if err != nil {
+ log.Fatalf("Failed to compile universal regex pattern
%q: %v", pattern, err)
+ }
+ compiledUniversalBypassPatterns[i] = compiled
+ }
+
+ compiledPlatformBypassPatterns = make(map[string][]*regexp.Regexp)
+ for platform, patterns := range platformBypassPatterns {
+ compiledPlatformBypassPatterns[platform] =
make([]*regexp.Regexp, len(patterns))
+ for i, pattern := range patterns {
+ compiled, err := regexp.Compile(pattern)
+ if err != nil {
+ log.Fatalf("Failed to compile platform regex
pattern %q for %s: %v", pattern, platform, err)
+ }
+ compiledPlatformBypassPatterns[platform][i] = compiled
+ }
+ }
+}
+
+// --- SSH Tunnel ---
+
+// sshTunnelHelper manages an SSH tunnel connection
+type sshTunnelHelper struct {
+ client *ssh.Client
+ listener net.Listener
+ logger logger.Logger
+}
+
+// startSSHTunnel starts an SSH tunnel
+// It connects to the SSH bastion, listens on a local random port,
+// and forwards traffic to the target database.
+func startSSHTunnel(config *jobtypes.SSHTunnel, remoteHost, remotePort string,
timeout time.Duration, log logger.Logger) (*sshTunnelHelper, string, string,
error) {
+ sshHost := config.Host
+ sshPort, err := strconv.Atoi(config.Port)
+ if err != nil {
+ return nil, "", "", fmt.Errorf("invalid SSH port: %v", err)
+ }
+ sshUser := config.Username
+ sshPassword := config.Password
+
+ // 1. Set up SSH client configuration
+ sshConfig := &ssh.ClientConfig{
+ User: sshUser,
+ Auth: []ssh.AuthMethod{
+ ssh.Password(sshPassword),
+ },
+ HostKeyCallback: ssh.InsecureIgnoreHostKey(), // Ignore host
key validation
+ Timeout: timeout,
+ }
+
+ sshAddr := fmt.Sprintf("%s:%d", sshHost, sshPort)
+ log.V(1).Info("starting SSH tunnel", "sshAddr", sshAddr, "remoteHost",
remoteHost, "remotePort", remotePort)
+
+ // 2. Connect to the SSH bastion
+ client, err := ssh.Dial("tcp", sshAddr, sshConfig)
+ if err != nil {
+ return nil, "", "", fmt.Errorf("failed to dial SSH server: %w",
err)
+ }
+
+ // 3. Listen on a random local port on 127.0.0.1
+ listener, err := net.Listen("tcp", "127.0.0.1:0")
+ if err != nil {
+ client.Close()
+ return nil, "", "", fmt.Errorf("failed to listen on local port:
%w", err)
+ }
+
+ localAddr := listener.Addr().String()
+ localHost, localPort, err := net.SplitHostPort(localAddr)
+ if err != nil {
+ client.Close()
+ listener.Close()
+ return nil, "", "", fmt.Errorf("failed to parse local address:
%w", err)
+ }
+
+ helper := &sshTunnelHelper{
+ client: client,
+ listener: listener,
+ logger: log,
+ }
+
+ // 4. Start a goroutine to accept local connections and forward them
+ go helper.runForwarder(remoteHost, remotePort)
+
+ log.V(1).Info("SSH tunnel started", "localAddr", localAddr, "remote",
fmt.Sprintf("%s:%s", remoteHost, remotePort))
+ return helper, localHost, localPort, nil
+}
+
+// runForwarder loops to accept and forward local connections
+func (t *sshTunnelHelper) runForwarder(remoteHost, remotePort string) {
+ remoteAddr := fmt.Sprintf("%s:%s", remoteHost, remotePort)
+ for {
+ localConn, err := t.listener.Accept()
+ if err != nil {
+ // If the listener is closed, exit the loop
+ if strings.Contains(err.Error(), "use of closed network
connection") {
+ break
+ }
+ t.logger.Error(err, "failed to accept local connection")
+ continue
+ }
+
+ // Start a goroutine to handle forwarding
+ go t.forwardConnection(localConn, remoteAddr)
+ }
+}
+
+// forwardConnection handles forwarding for a single connection
+func (t *sshTunnelHelper) forwardConnection(localConn net.Conn, remoteAddr
string) {
+ defer localConn.Close()
+
+ // 3. Connect to the remote database via the SSH tunnel
+ remoteConn, err := t.client.Dial("tcp", remoteAddr)
+ if err != nil {
+ t.logger.Error(err, "failed to dial remote address via SSH",
"remoteAddr", remoteAddr)
+ return
+ }
+ defer remoteConn.Close()
+
+ // 4. Bidirectionally copy data
+ go func() {
+ _, err := io.Copy(remoteConn, localConn)
+ if err != nil && err != io.EOF {
+ // t.logger.V(1).Error(err, "error copying local to
remote")
+ }
+ remoteConn.Close()
+ localConn.Close()
+ }()
+ go func() {
+ _, err := io.Copy(localConn, remoteConn)
+ if err != nil && err != io.EOF {
+ // t.logger.V(1).Error(err, "error copying remote to
local")
+ }
+ remoteConn.Close()
+ localConn.Close()
+ }()
+}
+
+// Close shuts down the SSH tunnel
+func (t *sshTunnelHelper) Close() {
+ t.listener.Close()
+ t.client.Close()
+ t.logger.V(1).Info("SSH tunnel closed")
+}
+
+// --- JDBC Collector ---
+
// JDBCCollector implements JDBC database collection
type JDBCCollector struct {
logger logger.Logger
@@ -64,13 +306,14 @@ type JDBCCollector struct {
// NewJDBCCollector creates a new JDBC collector
func NewJDBCCollector(logger logger.Logger) *JDBCCollector {
+ // Ensure security patterns are compiled only once
+ compilePatternsOnce.Do(initPatterns)
return &JDBCCollector{
logger: logger.WithName("jdbc-collector"),
}
}
// extractJDBCConfig extracts JDBC configuration from interface{} type
-// This function uses the parameter replacer for consistent configuration
extraction
func extractJDBCConfig(jdbcInterface interface{}) (*jobtypes.JDBCProtocol,
error) {
replacer := param.NewReplacer()
return replacer.ExtractJDBCConfig(jdbcInterface)
@@ -81,7 +324,6 @@ func (jc *JDBCCollector) PreCheck(metrics *jobtypes.Metrics)
error {
if metrics == nil {
return fmt.Errorf("metrics is nil")
}
-
if metrics.JDBC == nil {
return fmt.Errorf("JDBC protocol configuration is required")
}
@@ -95,8 +337,19 @@ func (jc *JDBCCollector) PreCheck(metrics
*jobtypes.Metrics) error {
return fmt.Errorf("JDBC configuration is required")
}
- // Validate required fields when URL is not provided
- if jdbcConfig.URL == "" {
+ // 1. Validate SSH tunnel configuration
+ if err := jc.checkTunnelParam(jdbcConfig.SSHTunnel); err != nil {
+ return err
+ }
+
+ // 2. Validate URL
+ if jdbcConfig.URL != "" {
+ // If a full URL is provided, perform strict security checks
+ if err := validateURL(jdbcConfig.URL, jdbcConfig.Platform); err
!= nil {
+ return err
+ }
+ } else {
+ // If URL is not provided, check basic host/port/platform
if jdbcConfig.Host == "" {
return fmt.Errorf("host is required when URL is not
provided")
}
@@ -108,99 +361,140 @@ func (jc *JDBCCollector) PreCheck(metrics
*jobtypes.Metrics) error {
}
}
- // Validate platform
+ // 3. Validate platform (Platform can be empty if URL is provided,
+ // but required if URL needs to be auto-built)
if jdbcConfig.Platform != "" {
switch jdbcConfig.Platform {
case PlatformMySQL, PlatformMariaDB, PlatformPostgreSQL,
PlatformSQLServer, PlatformOracle:
// Valid platforms
+ case PlatformClickHouse, PlatformDM:
+ // Platforms not yet fully supported
+ jc.logger.Info("warning: platform %s is not fully
supported in Go collector yet", jdbcConfig.Platform)
default:
return fmt.Errorf("unsupported database platform: %s",
jdbcConfig.Platform)
}
}
- // Validate query type
- if jdbcConfig.QueryType != "" {
- switch jdbcConfig.QueryType {
- case QueryTypeOneRow, QueryTypeMultiRow, QueryTypeColumns,
QueryTypeRunScript:
- // Valid query types
- default:
- return fmt.Errorf("unsupported query type: %s",
jdbcConfig.QueryType)
- }
+ // 4. Validate query type
+ if jdbcConfig.QueryType == "" {
+ // Default to oneRow
+ jdbcConfig.QueryType = QueryTypeOneRow
+ }
+ switch jdbcConfig.QueryType {
+ case QueryTypeOneRow, QueryTypeMultiRow, QueryTypeColumns,
QueryTypeRunScript:
+ // Valid query types
+ default:
+ return fmt.Errorf("unsupported query type: %s",
jdbcConfig.QueryType)
}
- // Validate SQL for most query types
- if jdbcConfig.QueryType != QueryTypeRunScript && jdbcConfig.SQL == "" {
- return fmt.Errorf("SQL is required for query type: %s",
jdbcConfig.QueryType)
+ // 5. Validate SQL (The SQL field for runScript contains the script
content and must also be checked)
+ if jdbcConfig.SQL == "" {
+ return fmt.Errorf("SQL/Script content is required for query
type: %s", jdbcConfig.QueryType)
}
return nil
}
+// checkTunnelParam validates SSH tunnel configuration
+func (jc *JDBCCollector) checkTunnelParam(config *jobtypes.SSHTunnel) error {
+ if config == nil {
+ return nil
+ }
+ enable, _ := strconv.ParseBool(config.Enable)
+ if !enable {
+ return nil
+ }
+ if config.Host == "" || config.Port == "" || config.Username == "" {
+ return fmt.Errorf("ssh tunnel host, port, and username are
required when enabled")
+ }
+ return nil
+}
+
// Collect performs JDBC metrics collection
func (jc *JDBCCollector) Collect(metrics *jobtypes.Metrics)
*jobtypes.CollectRepMetricsData {
startTime := time.Now()
- // Extract JDBC configuration
+ // 1. Extract configuration
jdbcConfig, err := extractJDBCConfig(metrics.JDBC)
if err != nil {
jc.logger.Error(err, "failed to extract JDBC config")
return jc.createFailResponse(metrics, CodeFail,
fmt.Sprintf("JDBC config error: %v", err))
}
- if jdbcConfig == nil {
- return jc.createFailResponse(metrics, CodeFail, "JDBC
configuration is required")
- }
-
- // Debug level only for collection start
- jc.logger.V(1).Info("starting JDBC collection",
- "host", jdbcConfig.Host,
- "platform", jdbcConfig.Platform,
- "queryType", jdbcConfig.QueryType)
- // Get timeout
+ // 2. Get timeout
timeout := jc.getTimeout(jdbcConfig.Timeout)
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
- // Get database URL
- databaseURL, err := jc.constructDatabaseURL(jdbcConfig)
+ // 3. Set up SSH tunnel (if needed)
+ localHost := jdbcConfig.Host
+ localPort := jdbcConfig.Port
+
+ if jdbcConfig.SSHTunnel != nil {
+ enableTunnel, _ :=
strconv.ParseBool(jdbcConfig.SSHTunnel.Enable)
+ if enableTunnel {
+ tunnel, lHost, lPort, err :=
startSSHTunnel(jdbcConfig.SSHTunnel, jdbcConfig.Host, jdbcConfig.Port, timeout,
jc.logger)
+ if err != nil {
+ jc.logger.Error(err, "failed to start SSH
tunnel")
+ return jc.createFailResponse(metrics,
CodeUnConnectable, fmt.Sprintf("SSH tunnel error: %v", err))
+ }
+ defer tunnel.Close()
+ localHost = lHost
+ localPort = lPort
+ }
+ }
+
+ // 4. Construct database URL
+ databaseURL, err := jc.constructDatabaseURL(jdbcConfig, localHost,
localPort)
if err != nil {
jc.logger.Error(err, "failed to construct database URL")
return jc.createFailResponse(metrics, CodeFail,
fmt.Sprintf("Database URL error: %v", err))
}
- // Create database connection
- db, err := jc.getConnection(databaseURL, jdbcConfig.Username,
jdbcConfig.Password, timeout)
+ // 5. Create database connection
+ db, err := jc.getConnection(ctx, databaseURL, jdbcConfig.Platform,
timeout)
if err != nil {
jc.logger.Error(err, "failed to connect to database")
+ // Ping failed inside getConnection, return UnConnectable
return jc.createFailResponse(metrics, CodeUnConnectable,
fmt.Sprintf("Connection error: %v", err))
}
defer db.Close()
- // Execute query based on type with context
+ // 6. Execute query
response := jc.createSuccessResponse(metrics)
-
switch jdbcConfig.QueryType {
case QueryTypeOneRow:
- err = jc.queryOneRow(db, jdbcConfig.SQL, metrics.AliasFields,
response)
+ err = jc.queryOneRow(ctx, db, jdbcConfig.SQL,
metrics.AliasFields, response)
case QueryTypeMultiRow:
- err = jc.queryMultiRow(db, jdbcConfig.SQL, metrics.AliasFields,
response)
+ err = jc.queryMultiRow(ctx, db, jdbcConfig.SQL,
metrics.AliasFields, response)
case QueryTypeColumns:
- err = jc.queryColumns(db, jdbcConfig.SQL, metrics.AliasFields,
response)
+ err = jc.queryColumns(ctx, db, jdbcConfig.SQL,
metrics.AliasFields, response)
case QueryTypeRunScript:
- err = jc.runScript(db, jdbcConfig.SQL, response)
+ err = jc.runScript(ctx, db, jdbcConfig.SQL, response)
default:
err = fmt.Errorf("unsupported query type: %s",
jdbcConfig.QueryType)
}
+ // 7. Process query results
+ duration := time.Since(startTime)
if err != nil {
jc.logger.Error(err, "query execution failed", "queryType",
jdbcConfig.QueryType)
+ // Check for specific errors
+ if pqErr, ok := err.(*pq.Error); ok {
+ if pqErr.Code == pqUnreachableCode {
+ return jc.createFailResponse(metrics,
CodeUnReachable, fmt.Sprintf("Query error: %v (Code: %s)", err, pqErr.Code))
+ }
+ }
+ if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
+ return jc.createFailResponse(metrics, CodeUnReachable,
fmt.Sprintf("Query timeout: %v", err))
+ }
return jc.createFailResponse(metrics, CodeFail,
fmt.Sprintf("Query error: %v", err))
}
- duration := time.Since(startTime)
- // Debug level only for successful completion
- jc.logger.V(1).Info("JDBC collection completed",
- "duration", duration,
- "rowCount", len(response.Values))
+ // Inject response time (if response_time field exists)
+ jc.addResponseTime(response, metrics.AliasFields, duration)
+ jc.logger.V(1).Info("JDBC collection completed", "duration", duration,
"rowCount", len(response.Values))
return response
}
@@ -215,11 +509,12 @@ func (jc *JDBCCollector) getTimeout(timeoutStr string)
time.Duration {
return 30 * time.Second // default timeout
}
- // First try parsing as duration string (e.g., "10s", "5m", "500ms")
+ // Try parsing as Go duration string (e.g., "10s", "5m", "500ms")
if duration, err := time.ParseDuration(timeoutStr); err == nil {
return duration
}
+ // Try parsing as Java version's milliseconds
if timeout, err := strconv.Atoi(timeoutStr); err == nil {
return time.Duration(timeout) * time.Millisecond
}
@@ -227,61 +522,171 @@ func (jc *JDBCCollector) getTimeout(timeoutStr string)
time.Duration {
return 30 * time.Second // fallback to default
}
-// constructDatabaseURL constructs the database connection URL
-func (jc *JDBCCollector) constructDatabaseURL(jdbc *jobtypes.JDBCProtocol)
(string, error) {
+// recursiveDecode recursively decodes a URL (max 5 times)
+func recursiveDecode(s string) (string, error) {
+ prev := s
+ for i := 0; i < 5; i++ {
+ decoded, err := url.QueryUnescape(prev)
+ if err != nil {
+ // If decoding fails, return the last successfully
decoded version
+ return prev, err
+ }
+ if decoded == prev {
+ // If no change after decoding, decoding is complete
+ return decoded, nil
+ }
+ prev = decoded
+ }
+ return prev, nil
+}
+
+// validateURL performs strict URL security validation
+func validateURL(rawURL string, platform string) error {
+ // 1. Length limit
+ if len(rawURL) > 2048 {
+ return fmt.Errorf("JDBC URL length exceeds maximum limit of
2048 characters")
+ }
+
+ // 2. Clean invisible characters
+ cleanedUrl := invalidCharRegex.ReplaceAllString(rawURL, "")
+
+ // 3. Recursive decoding
+ decodedUrl, err := recursiveDecode(cleanedUrl)
+ if err != nil {
+ // We can't use jc.logger here as it's a static function
+ // log.Printf("URL decoding error: %v", err)
+ // Continue checking with the pre-decoded string even if
decoding fails
+ decodedUrl = cleanedUrl
+ }
+
+ urlLowerCase := strings.ToLower(decodedUrl)
+
+ // 4. Check VULNERABLE_KEYWORDS
+ for _, keyword := range vulnerableKeywords {
+ if strings.Contains(urlLowerCase, keyword) {
+ return fmt.Errorf("JDBC URL prohibit contains
vulnerable param %s", keyword)
+ }
+ }
+
+ // 5. Check BLACK_LIST
+ for _, keyword := range blackList {
+ if strings.Contains(urlLowerCase, keyword) {
+ return fmt.Errorf("invalid JDBC URL: contains
potentially malicious parameter: %s", keyword)
+ }
+ }
+
+ // 6. Check JNDI/Deserialization (simplified)
+ if strings.Contains(urlLowerCase, "jndi:") ||
strings.Contains(urlLowerCase, "ldap:") || strings.Contains(urlLowerCase,
"rmi:") {
+ return fmt.Errorf("invalid JDBC URL: contains potentially
malicious JNDI or deserialization parameter")
+ }
+
+ // 7. Check universal bypass patterns
+ for _, pattern := range compiledUniversalBypassPatterns {
+ if pattern.MatchString(urlLowerCase) {
+ return fmt.Errorf("invalid JDBC URL: contains
potentially malicious bypass pattern: %s", pattern.String())
+ }
+ }
+
+ // 8. Check platform-specific bypass patterns
+ if platform != "" {
+ if patterns, ok :=
compiledPlatformBypassPatterns[strings.ToLower(platform)]; ok {
+ for _, pattern := range patterns {
+ if pattern.MatchString(urlLowerCase) {
+ return fmt.Errorf("invalid %s JDBC URL:
contains potentially malicious bypass pattern: %s", platform, pattern.String())
+ }
+ }
+ }
+ }
+
+ return nil
+}
+
+// constructDatabaseURL constructs the database connection URL/DSN
+func (jc *JDBCCollector) constructDatabaseURL(jdbc *jobtypes.JDBCProtocol,
host, port string) (string, error) {
+ // 1. If user provided a full URL, use it (already validated in
PreCheck)
+ if jdbc.URL != "" {
+ // Validate again to prevent bypassing PreCheck
+ if err := validateURL(jdbc.URL, jdbc.Platform); err != nil {
+ return "", err
+ }
+ return jdbc.URL, nil
+ }
- // Construct URL based on platform
- host := jdbc.Host
- port := jdbc.Port
+ // 2. Auto-build DSN based on platform
database := jdbc.Database
+ username := jdbc.Username
+ password := jdbc.Password
switch jdbc.Platform {
case PlatformMySQL, PlatformMariaDB:
- // MySQL DSN format:
[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN]
- return
fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?parseTime=true&charset=utf8mb4",
- jdbc.Username, jdbc.Password, host, port, database), nil
+ // MySQL DSN:
[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN]
+ dsn :=
fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?parseTime=true&charset=utf8mb4&timeout=30s&readTimeout=30s&writeTimeout=30s",
+ username, password, host, port, database)
+ // runScript requires multiStatements support
+ if jdbc.QueryType == QueryTypeRunScript {
+ dsn += "&multiStatements=true"
+ }
+ return dsn, nil
case PlatformPostgreSQL:
- return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable",
- jdbc.Username, jdbc.Password, host, port, database), nil
+ // PostgreSQL DSN:
postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]
+ return
fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable&connect_timeout=30",
+ username, password, host, port, database), nil
case PlatformSQLServer:
- return
fmt.Sprintf("sqlserver://%s:%s@%s:%s?database=%s&trustServerCertificate=true",
- jdbc.Username, jdbc.Password, host, port, database), nil
+ // SQL Server DSN:
sqlserver://username:password@host:port?database=dbname&...
+ return
fmt.Sprintf("sqlserver://%s:%s@%s:%s?database=%s&trustServerCertificate=true&encrypt=disable&connection+timeout=30",
+ username, password, host, port, database), nil
+ case PlatformOracle:
+ // go-ora DSN: oracle://user:pass@host:port/service_name
+ // Note: Oracle's 'database' field usually corresponds to
'Service Name'
+ return fmt.Sprintf("oracle://%s:%s@%s:%s/%s",
+ username, password, host, port, database), nil
default:
- return "", fmt.Errorf("unsupported database platform: %s",
jdbc.Platform)
+ return "", fmt.Errorf("unsupported database platform for URL
construction: %s", jdbc.Platform)
}
}
// getConnection creates a database connection with timeout
-func (jc *JDBCCollector) getConnection(databaseURL, username, password string,
timeout time.Duration) (*sql.DB, error) {
- // Extract driver name from URL
+func (jc *JDBCCollector) getConnection(ctx context.Context, databaseURL
string, platform string, timeout time.Duration) (*sql.DB, error) {
+ // 1. Select driver based on platform
var driverName string
- if strings.HasPrefix(databaseURL, "postgres://") {
+ switch platform {
+ case PlatformMySQL, PlatformMariaDB:
+ driverName = "mysql"
+ case PlatformPostgreSQL:
driverName = "postgres"
- } else if strings.HasPrefix(databaseURL, "sqlserver://") {
+ case PlatformSQLServer:
driverName = "sqlserver"
- } else if strings.Contains(databaseURL, "@tcp(") {
- // MySQL DSN format (no protocol prefix)
- driverName = "mysql"
- } else {
- return nil, fmt.Errorf("unsupported database URL format: %s",
databaseURL)
+ case PlatformOracle:
+ driverName = "oracle"
+ default:
+ // Try to infer from URL (for user-customized URLs)
+ if strings.HasPrefix(databaseURL, "postgres:") {
+ driverName = "postgres"
+ } else if strings.HasPrefix(databaseURL, "sqlserver:") {
+ driverName = "sqlserver"
+ } else if strings.HasPrefix(databaseURL, "oracle:") {
+ driverName = "oracle"
+ } else if strings.Contains(databaseURL, "@tcp(") { // MySQL DSN
format
+ driverName = "mysql"
+ } else {
+ return nil, fmt.Errorf("unsupported platform or
ambiguous URL: %s", platform)
+ }
}
- // Open database connection
+ // 2. Open database connection
db, err := sql.Open(driverName, databaseURL)
if err != nil {
return nil, fmt.Errorf("failed to open database: %w", err)
}
- // Set connection timeout and pool settings
+ // 3. Set connection pool parameters
db.SetConnMaxLifetime(timeout)
- db.SetMaxOpenConns(5) // Allow more concurrent
connections
- db.SetMaxIdleConns(2) // Keep some idle connections
- db.SetConnMaxIdleTime(30 * time.Second) // Idle connection timeout
-
- // Test connection with timeout context
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- defer cancel()
+ db.SetMaxOpenConns(5)
+ db.SetMaxIdleConns(2)
+ db.SetConnMaxIdleTime(30 * time.Second)
+ // 4. Test connection (Ping)
+ // Use the incoming context which has the overall timeout
if err := db.PingContext(ctx); err != nil {
db.Close()
return nil, fmt.Errorf("failed to ping database: %w", err)
@@ -290,9 +695,41 @@ func (jc *JDBCCollector) getConnection(databaseURL,
username, password string, t
return db, nil
}
+// convertRowValues converts raw database values (interface{}) to a string
slice
+// Deprecated: Using sql.NullString is preferred.
+func (jc *JDBCCollector) convertRowValues(rawValues []interface{}, columns
[]sql.NullString) []string {
+ stringValues := make([]string, len(rawValues))
+ for i, rawValue := range rawValues {
+ if rawValue == nil {
+ stringValues[i] = constants.NULL_VALUE // Use "NULL"
defined in Java version
+ continue
+ }
+
+ // Try using NullString (although Scan already handled it)
+ if columns[i].Valid {
+ stringValues[i] = columns[i].String
+ } else if rawValue != nil {
+ // Fallback to fmt.Sprintf
+ switch v := rawValue.(type) {
+ case []byte:
+ stringValues[i] = string(v) // Handle byte
arrays
+ case time.Time:
+ stringValues[i] = v.Format(time.RFC3339) //
Unify time format
+ default:
+ stringValues[i] = fmt.Sprintf("%v", v)
+ }
+ } else {
+ stringValues[i] = constants.NULL_VALUE
+ }
+ }
+ return stringValues
+}
+
// queryOneRow executes a query and returns only the first row
-func (jc *JDBCCollector) queryOneRow(db *sql.DB, sqlQuery string, aliasFields
[]string, response *jobtypes.CollectRepMetricsData) error {
- rows, err := db.Query(sqlQuery)
+func (jc *JDBCCollector) queryOneRow(ctx context.Context, db *sql.DB, sqlQuery
string, aliasFields []string, response *jobtypes.CollectRepMetricsData) error {
+ // Java version uses setMaxRows(1), which Go's standard library doesn't
support.
+ // We simulate this by only reading the first row.
+ rows, err := db.QueryContext(ctx, sqlQuery)
if err != nil {
return fmt.Errorf("query execution failed: %w", err)
}
@@ -304,37 +741,29 @@ func (jc *JDBCCollector) queryOneRow(db *sql.DB, sqlQuery
string, aliasFields []
return fmt.Errorf("failed to get columns: %w", err)
}
- // Build fields
- for i, column := range columns {
- field := jobtypes.Field{
- Field: column,
- Type: 1, // String type
- Label: false,
- Unit: "",
- Instance: i == 0,
- }
- response.Fields = append(response.Fields, field)
- }
+ // Build fields (using aliasFields or original column names)
+ fieldNames := jc.buildFields(response, aliasFields, columns)
- // Get first row
+ // Get the first row
if rows.Next() {
- values := make([]interface{}, len(columns))
- valuePtrs := make([]interface{}, len(columns))
+ // Use sql.NullString to handle NULL values correctly
+ values := make([]interface{}, len(fieldNames))
+ scanValues := make([]sql.NullString, len(fieldNames))
for i := range values {
- valuePtrs[i] = &values[i]
+ values[i] = &scanValues[i]
}
- if err := rows.Scan(valuePtrs...); err != nil {
+ if err := rows.Scan(values...); err != nil {
return fmt.Errorf("failed to scan row: %w", err)
}
- // Convert to string values
- stringValues := make([]string, len(values))
- for i, v := range values {
- if v != nil {
- stringValues[i] = fmt.Sprintf("%v", v)
+ // Convert
+ stringValues := make([]string, len(fieldNames))
+ for i, sv := range scanValues {
+ if sv.Valid {
+ stringValues[i] = sv.String
} else {
- stringValues[i] = ""
+ stringValues[i] = constants.NULL_VALUE
}
}
@@ -343,12 +772,12 @@ func (jc *JDBCCollector) queryOneRow(db *sql.DB, sqlQuery
string, aliasFields []
})
}
- return nil
+ return rows.Err()
}
// queryMultiRow executes a query and returns multiple rows
-func (jc *JDBCCollector) queryMultiRow(db *sql.DB, sqlQuery string,
aliasFields []string, response *jobtypes.CollectRepMetricsData) error {
- rows, err := db.Query(sqlQuery)
+func (jc *JDBCCollector) queryMultiRow(ctx context.Context, db *sql.DB,
sqlQuery string, aliasFields []string, response
*jobtypes.CollectRepMetricsData) error {
+ rows, err := db.QueryContext(ctx, sqlQuery)
if err != nil {
return fmt.Errorf("query execution failed: %w", err)
}
@@ -361,36 +790,27 @@ func (jc *JDBCCollector) queryMultiRow(db *sql.DB,
sqlQuery string, aliasFields
}
// Build fields
- for i, column := range columns {
- field := jobtypes.Field{
- Field: column,
- Type: 1, // String type
- Label: false,
- Unit: "",
- Instance: i == 0,
- }
- response.Fields = append(response.Fields, field)
- }
+ fieldNames := jc.buildFields(response, aliasFields, columns)
// Get all rows
for rows.Next() {
- values := make([]interface{}, len(columns))
- valuePtrs := make([]interface{}, len(columns))
+ values := make([]interface{}, len(fieldNames))
+ scanValues := make([]sql.NullString, len(fieldNames))
for i := range values {
- valuePtrs[i] = &values[i]
+ values[i] = &scanValues[i]
}
- if err := rows.Scan(valuePtrs...); err != nil {
+ if err := rows.Scan(values...); err != nil {
return fmt.Errorf("failed to scan row: %w", err)
}
- // Convert to string values
- stringValues := make([]string, len(values))
- for i, v := range values {
- if v != nil {
- stringValues[i] = fmt.Sprintf("%v", v)
+ // Convert
+ stringValues := make([]string, len(fieldNames))
+ for i, sv := range scanValues {
+ if sv.Valid {
+ stringValues[i] = sv.String
} else {
- stringValues[i] = ""
+ stringValues[i] = constants.NULL_VALUE
}
}
@@ -403,92 +823,158 @@ func (jc *JDBCCollector) queryMultiRow(db *sql.DB,
sqlQuery string, aliasFields
}
// queryColumns executes a query and matches two columns (similar to the Java
implementation)
-func (jc *JDBCCollector) queryColumns(db *sql.DB, sqlQuery string, aliasFields
[]string, response *jobtypes.CollectRepMetricsData) error {
- rows, err := db.Query(sqlQuery)
+func (jc *JDBCCollector) queryColumns(ctx context.Context, db *sql.DB,
sqlQuery string, aliasFields []string, response
*jobtypes.CollectRepMetricsData) error {
+ rows, err := db.QueryContext(ctx, sqlQuery)
if err != nil {
return fmt.Errorf("query execution failed: %w", err)
}
defer rows.Close()
- // Get column names
+ // Get column names (ensure at least two columns)
columns, err := rows.Columns()
if err != nil {
return fmt.Errorf("failed to get columns: %w", err)
}
-
if len(columns) < 2 {
return fmt.Errorf("columns query type requires at least 2
columns, got %d", len(columns))
}
- // Build fields from alias fields or default columns
- var fieldNames []string
- if len(aliasFields) > 0 {
- fieldNames = aliasFields
- } else {
- fieldNames = columns
+ // Build fields (must use aliasFields)
+ if len(aliasFields) == 0 {
+ return fmt.Errorf("columns query type requires aliasFields to
be defined")
}
+ fieldNames := jc.buildFields(response, aliasFields, nil)
- for i, fieldName := range fieldNames {
- field := jobtypes.Field{
- Field: fieldName,
- Type: 1, // String type
- Label: false,
- Unit: "",
- Instance: i == 0,
- }
- response.Fields = append(response.Fields, field)
- }
-
- // Create value row with column mappings
- valueRow := jobtypes.ValueRow{
- Columns: make([]string, len(fieldNames)),
- }
-
- // Process rows to build key-value mapping
+ // Create K-V map
keyValueMap := make(map[string]string)
for rows.Next() {
- values := make([]interface{}, len(columns))
- valuePtrs := make([]interface{}, len(columns))
- for i := range values {
- valuePtrs[i] = &values[i]
+ var key, value sql.NullString
+
+ // Scan only the first two columns
+ // Handle varying number of columns in result set gracefully
+ scanArgs := make([]interface{}, len(columns))
+ scanArgs[0] = &key
+ scanArgs[1] = &value
+ for i := 2; i < len(columns); i++ {
+ scanArgs[i] = new(sql.RawBytes) // Discard extra columns
}
- if err := rows.Scan(valuePtrs...); err != nil {
- return fmt.Errorf("failed to scan row: %w", err)
+ if err := rows.Scan(scanArgs...); err != nil {
+ // If scan fails (e.g., column mismatch), log warning
and continue
+ jc.logger.Error(err, "failed to scan row in columns
query")
+ continue
}
- // Use first column as key, second as value
- key := ""
- value := ""
- if values[0] != nil {
- key = fmt.Sprintf("%v", values[0])
- }
- if len(values) > 1 && values[1] != nil {
- value = fmt.Sprintf("%v", values[1])
+ if key.Valid {
+ // Corresponds to Java:
values.put(resultSet.getString(1).toLowerCase().trim(), resultSet.getString(2));
+ keyString :=
strings.ToLower(strings.TrimSpace(key.String))
+ if value.Valid {
+ keyValueMap[keyString] = value.String
+ } else {
+ keyValueMap[keyString] = constants.NULL_VALUE
+ }
}
- keyValueMap[key] = value
+ }
+ if err := rows.Err(); err != nil {
+ return err
}
- // Map values to alias fields
+ // Map to the result row
+ valueRow := jobtypes.ValueRow{
+ Columns: make([]string, len(fieldNames)),
+ }
for i, fieldName := range fieldNames {
- if value, exists := keyValueMap[fieldName]; exists {
+ // Corresponds to Java: String value =
values.get(column.toLowerCase());
+ if value, exists := keyValueMap[strings.ToLower(fieldName)];
exists {
valueRow.Columns[i] = value
} else {
- valueRow.Columns[i] = ""
+ // Check if it's response_time
+ if fieldName == constants.RESPONSE_TIME {
+ // This value will be populated at the end of
Collect by addResponseTime
+ valueRow.Columns[i] = "0"
+ } else {
+ valueRow.Columns[i] = constants.NULL_VALUE
+ }
}
}
response.Values = append(response.Values, valueRow)
- return rows.Err()
+ return nil
}
-// runScript executes a SQL script (placeholder implementation)
-func (jc *JDBCCollector) runScript(db *sql.DB, scriptPath string, response
*jobtypes.CollectRepMetricsData) error {
- // For security reasons, we'll just return success without actually
running scripts
- jc.logger.Info("script execution requested but disabled for security",
"scriptPath", scriptPath)
+// runScript executes a SQL script (as content, not file path)
+func (jc *JDBCCollector) runScript(ctx context.Context, db *sql.DB,
scriptContent string, response *jobtypes.CollectRepMetricsData) error {
+ // The Java version uses ScriptUtils to execute a file; here we execute
the content from the SQL field.
+ // Note: DSN must have multiStatements=true enabled (e.g., for MySQL).
+ result, err := db.ExecContext(ctx, scriptContent)
+ if err != nil {
+ return fmt.Errorf("script execution failed: %w", err)
+ }
+
+ // Script execution usually doesn't return rows, but we can return the
affected rows.
+ // We define a "rows_affected" field.
+ jc.buildFields(response, nil, []string{"rows_affected"})
+ rowsAffected, err := result.RowsAffected()
+ if err != nil {
+ // Some drivers (like PostgreSQL) might not support
RowsAffected for scripts
+ jc.logger.V(1).Info("could not get rows affected from script
execution", "error", err)
+ response.Values = append(response.Values, jobtypes.ValueRow{
+ Columns: []string{"0"},
+ })
+ } else {
+ response.Values = append(response.Values, jobtypes.ValueRow{
+ Columns: []string{strconv.FormatInt(rowsAffected, 10)},
+ })
+ }
+
return nil
}
+// buildFields populates the response.Fields slice based on aliasFields or
columns
+// It returns the list of field names that should be used for value mapping.
+func (jc *JDBCCollector) buildFields(response *jobtypes.CollectRepMetricsData,
aliasFields []string, columns []string) []string {
+ var fieldNames []string
+ if len(aliasFields) > 0 {
+ fieldNames = aliasFields
+ } else {
+ fieldNames = columns
+ }
+
+ response.Fields = make([]jobtypes.Field, len(fieldNames))
+ for i, fieldName := range fieldNames {
+ response.Fields[i] = jobtypes.Field{
+ Field: fieldName,
+ Type: constants.TYPE_STRING,
+ Label: false,
+ Unit: "",
+ Instance: i == 0, // Default first as instance
+ }
+ }
+ return fieldNames
+}
+
+// addResponseTime checks if response_time is requested and adds it to all
value rows
+func (jc *JDBCCollector) addResponseTime(response
*jobtypes.CollectRepMetricsData, aliasFields []string, duration time.Duration) {
+ rtIndex := -1
+ for i, field := range aliasFields {
+ if field == constants.RESPONSE_TIME {
+ rtIndex = i
+ break
+ }
+ }
+
+ if rtIndex == -1 {
+ return
+ }
+
+ responseTimeMs := strconv.FormatInt(duration.Milliseconds(), 10)
+ for i := range response.Values {
+ if len(response.Values[i].Columns) > rtIndex {
+ response.Values[i].Columns[rtIndex] = responseTimeMs
+ }
+ }
+}
+
// createSuccessResponse creates a successful metrics data response
func (jc *JDBCCollector) createSuccessResponse(metrics *jobtypes.Metrics)
*jobtypes.CollectRepMetricsData {
return &jobtypes.CollectRepMetricsData{
@@ -498,7 +984,7 @@ func (jc *JDBCCollector) createSuccessResponse(metrics
*jobtypes.Metrics) *jobty
Metrics: metrics.Name,
Priority: 0,
Time: time.Now().UnixMilli(),
- Code: 200, // Success
+ Code: CodeSuccess,
Msg: "success",
Fields: make([]jobtypes.Field, 0),
Values: make([]jobtypes.ValueRow, 0),
diff --git a/internal/collector/basic/ssh/ssh_collector.go
b/internal/collector/basic/ssh/ssh_collector.go
index 39ef137..79e4370 100644
--- a/internal/collector/basic/ssh/ssh_collector.go
+++ b/internal/collector/basic/ssh/ssh_collector.go
@@ -38,7 +38,7 @@ const (
// Special fields
ResponseTime = "responseTime"
- NullValue = consts.NullValue
+ NullValue = consts.NULL_VALUE
// Parse types
ParseTypeOneRow = "oneRow" // Each line corresponds to one field
value, in order of aliasFields
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go
b/internal/collector/common/collect/dispatch/metrics_collector.go
index 1a36d52..c4aea7e 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -85,7 +85,7 @@ func (mc *MetricsCollector) CalculateFields(metrics
*jobtypes.Metrics, result *j
break
}
aliasFieldValue := aliasRow.Columns[aliasIndex]
- if aliasFieldValue == constants.NullValue {
+ if aliasFieldValue == constants.NULL_VALUE {
fieldValueMap[aliasField] = nil
stringTypeFieldValueMap[aliasField] = nil
continue
@@ -115,7 +115,7 @@ func (mc *MetricsCollector) CalculateFields(metrics
*jobtypes.Metrics, result *j
if program, exists := fieldExpressionMap[realField];
exists {
var context map[string]interface{}
- if field.Type == constants.TypeString {
+ if field.Type == constants.TYPE_STRING {
context = stringTypeFieldValueMap
} else {
context = fieldValueMap
@@ -146,9 +146,9 @@ func (mc *MetricsCollector) CalculateFields(metrics
*jobtypes.Metrics, result *j
}
// Process based on field type if value exists
- if value != "" && value != constants.NullValue {
+ if value != "" && value != constants.NULL_VALUE {
switch field.Type {
- case constants.TypeNumber:
+ case constants.TYPE_NUMBER:
// Extract numeric value and format
doubleAndUnit :=
replacer.ExtractDoubleAndUnitFromStr(value)
if doubleAndUnit != nil {
@@ -180,24 +180,24 @@ func (mc *MetricsCollector) CalculateFields(metrics
*jobtypes.Metrics, result *j
}
value =
formatNumber(numericValue)
} else {
- value = constants.NullValue
+ value = constants.NULL_VALUE
}
- case constants.TypeTime:
+ case constants.TYPE_TIME:
// TODO: Implement time parsing
// For now, keep original value
}
}
if value == "" {
- value = constants.NullValue
+ value = constants.NULL_VALUE
}
realValueRow.Columns = append(realValueRow.Columns,
value)
// Add the calculated field value to context maps for
use in subsequent expressions
// This allows later expressions to reference earlier
calculated fields
- if value != constants.NullValue {
+ if value != constants.NULL_VALUE {
doubleAndUnit :=
replacer.ExtractDoubleAndUnitFromStr(value)
if doubleAndUnit != nil {
fieldValueMap[realField] =
doubleAndUnit.Value
diff --git
a/internal/collector/common/collect/dispatch/metrics_collector_test.go
b/internal/collector/common/collect/dispatch/metrics_collector_test.go
index 07186a3..bae124d 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector_test.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector_test.go
@@ -47,9 +47,9 @@ func TestCalculateFieldsWithUnitConversion(t *testing.T) {
Name: "memory",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "total", Type:
constants.TypeNumber},
- {Field: "used", Type:
constants.TypeNumber},
- {Field: "free", Type:
constants.TypeNumber},
+ {Field: "total", Type:
constants.TYPE_NUMBER},
+ {Field: "used", Type:
constants.TYPE_NUMBER},
+ {Field: "free", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"total", "used", "free"},
Units: []string{"total=B->MB",
"used=B->MB", "free=B->MB"},
@@ -70,8 +70,8 @@ func TestCalculateFieldsWithUnitConversion(t *testing.T) {
Name: "disk",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "size", Type:
constants.TypeNumber},
- {Field: "usage", Type:
constants.TypeString},
+ {Field: "size", Type:
constants.TYPE_NUMBER},
+ {Field: "usage", Type:
constants.TYPE_STRING},
},
AliasFields: []string{"size", "usage"},
Units: []string{"size=KB->GB"},
@@ -92,7 +92,7 @@ func TestCalculateFieldsWithUnitConversion(t *testing.T) {
Name: "memory",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "committed", Type:
constants.TypeNumber},
+ {Field: "committed", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"committed"},
Units: []string{"committed=B->MB"},
@@ -113,8 +113,8 @@ func TestCalculateFieldsWithUnitConversion(t *testing.T) {
Name: "cpu",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "usage", Type:
constants.TypeNumber},
- {Field: "cores", Type:
constants.TypeNumber},
+ {Field: "usage", Type:
constants.TYPE_NUMBER},
+ {Field: "cores", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"usage", "cores"},
},
@@ -181,8 +181,8 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
Name: "memory",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "total", Type:
constants.TypeNumber},
- {Field: "used", Type:
constants.TypeNumber},
+ {Field: "total", Type:
constants.TYPE_NUMBER},
+ {Field: "used", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"total_bytes",
"used_bytes"},
Calculates: []string{"total=total_bytes",
"used=used_bytes"},
@@ -203,9 +203,9 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
Name: "memory",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "total", Type:
constants.TypeNumber},
- {Field: "used", Type:
constants.TypeNumber},
- {Field: "usage", Type:
constants.TypeNumber},
+ {Field: "total", Type:
constants.TYPE_NUMBER},
+ {Field: "used", Type:
constants.TYPE_NUMBER},
+ {Field: "usage", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"total", "used"},
Calculates: []string{"usage=(used / total) *
100"},
@@ -226,10 +226,10 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
Name: "disk",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "total", Type:
constants.TypeNumber},
- {Field: "used", Type:
constants.TypeNumber},
- {Field: "free", Type:
constants.TypeNumber},
- {Field: "usage_percent", Type:
constants.TypeNumber},
+ {Field: "total", Type:
constants.TYPE_NUMBER},
+ {Field: "used", Type:
constants.TYPE_NUMBER},
+ {Field: "free", Type:
constants.TYPE_NUMBER},
+ {Field: "usage_percent", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"total_space",
"used_space", "free_space", "usage"},
Calculates: []string{
@@ -255,9 +255,9 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
Name: "memory",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "total", Type:
constants.TypeNumber},
- {Field: "used", Type:
constants.TypeNumber},
- {Field: "free", Type:
constants.TypeNumber},
+ {Field: "total", Type:
constants.TYPE_NUMBER},
+ {Field: "used", Type:
constants.TYPE_NUMBER},
+ {Field: "free", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"total_mb", "used_mb",
"free_mb"},
Calculates: []jobtypes.Calculate{
@@ -282,10 +282,10 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
Name: "jvm",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "heap_max", Type:
constants.TypeNumber},
- {Field: "heap_used", Type:
constants.TypeNumber},
- {Field: "heap_free", Type:
constants.TypeNumber},
- {Field: "heap_usage", Type:
constants.TypeNumber},
+ {Field: "heap_max", Type:
constants.TYPE_NUMBER},
+ {Field: "heap_used", Type:
constants.TYPE_NUMBER},
+ {Field: "heap_free", Type:
constants.TYPE_NUMBER},
+ {Field: "heap_usage", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"max", "used", "free",
"usage"},
Calculates: []jobtypes.Calculate{
@@ -311,8 +311,8 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
Name: "cpu",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "idle", Type:
constants.TypeNumber},
- {Field: "usage", Type:
constants.TypeNumber},
+ {Field: "idle", Type:
constants.TYPE_NUMBER},
+ {Field: "usage", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"idle_percent",
"usage_percent"},
Calculates: []interface{}{"idle=idle_percent",
"usage=usage_percent"},
@@ -333,9 +333,9 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
Name: "network",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "rx_bytes", Type:
constants.TypeNumber},
- {Field: "tx_bytes", Type:
constants.TypeNumber},
- {Field: "total_bytes", Type:
constants.TypeNumber},
+ {Field: "rx_bytes", Type:
constants.TYPE_NUMBER},
+ {Field: "tx_bytes", Type:
constants.TYPE_NUMBER},
+ {Field: "total_bytes", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"received",
"transmitted", "total"},
Calculates: []interface{}{
@@ -360,9 +360,9 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
Name: "system",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "load1", Type:
constants.TypeNumber},
- {Field: "load5", Type:
constants.TypeNumber},
- {Field: "load15", Type:
constants.TypeNumber},
+ {Field: "load1", Type:
constants.TYPE_NUMBER},
+ {Field: "load5", Type:
constants.TYPE_NUMBER},
+ {Field: "load15", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"load_1min", "load_5min",
"load_15min"},
Calculates: []interface{}{
@@ -387,9 +387,9 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
Name: "database",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "connections", Type:
constants.TypeNumber},
- {Field: "max_connections", Type:
constants.TypeNumber},
- {Field: "usage_rate", Type:
constants.TypeNumber},
+ {Field: "connections", Type:
constants.TYPE_NUMBER},
+ {Field: "max_connections", Type:
constants.TYPE_NUMBER},
+ {Field: "usage_rate", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"active", "max"},
Calculates: []interface{}{
@@ -414,9 +414,9 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
Name: "storage",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "total", Type:
constants.TypeNumber},
- {Field: "used", Type:
constants.TypeNumber},
- {Field: "available", Type:
constants.TypeNumber},
+ {Field: "total", Type:
constants.TYPE_NUMBER},
+ {Field: "used", Type:
constants.TYPE_NUMBER},
+ {Field: "available", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"capacity", "occupied",
"free_space"},
Calculates: []interface{}{
@@ -441,7 +441,7 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
Name: "simple",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "value", Type:
constants.TypeNumber},
+ {Field: "value", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"value"},
},
@@ -614,9 +614,9 @@ func TestCalculateFieldsWithUnitStructFormat(t *testing.T) {
Name: "memory",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "total", Type:
constants.TypeNumber},
- {Field: "used", Type:
constants.TypeNumber},
- {Field: "free", Type:
constants.TypeNumber},
+ {Field: "total", Type:
constants.TYPE_NUMBER},
+ {Field: "used", Type:
constants.TYPE_NUMBER},
+ {Field: "free", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"total", "used", "free"},
Units: []jobtypes.Unit{
@@ -641,9 +641,9 @@ func TestCalculateFieldsWithUnitStructFormat(t *testing.T) {
Name: "storage",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "disk_total", Type:
constants.TypeNumber},
- {Field: "disk_used", Type:
constants.TypeNumber},
- {Field: "disk_free", Type:
constants.TypeNumber},
+ {Field: "disk_total", Type:
constants.TYPE_NUMBER},
+ {Field: "disk_used", Type:
constants.TYPE_NUMBER},
+ {Field: "disk_free", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"disk_total",
"disk_used", "disk_free"},
Units: []interface{}{
@@ -668,9 +668,9 @@ func TestCalculateFieldsWithUnitStructFormat(t *testing.T) {
Name: "jvm",
Priority: 0,
Fields: []jobtypes.Field{
- {Field: "heap_max", Type:
constants.TypeNumber},
- {Field: "heap_used", Type:
constants.TypeNumber},
- {Field: "heap_usage", Type:
constants.TypeNumber},
+ {Field: "heap_max", Type:
constants.TYPE_NUMBER},
+ {Field: "heap_used", Type:
constants.TYPE_NUMBER},
+ {Field: "heap_usage", Type:
constants.TYPE_NUMBER},
},
AliasFields: []string{"max_bytes",
"used_bytes"},
Calculates: []string{"heap_max=max_bytes",
"heap_used=used_bytes", "heap_usage=(heap_used / heap_max) * 100"},
diff --git a/internal/collector/common/types/job/metrics_types.go
b/internal/collector/common/types/job/metrics_types.go
index ef88be8..6474be1 100644
--- a/internal/collector/common/types/job/metrics_types.go
+++ b/internal/collector/common/types/job/metrics_types.go
@@ -205,18 +205,18 @@ type SSHProtocol struct {
// JDBCProtocol represents JDBC protocol configuration
type JDBCProtocol struct {
- Host string `json:"host"`
- Port string `json:"port"`
- Platform string `json:"platform"`
- Username string `json:"username"`
- Password string `json:"password"`
- Database string `json:"database"`
- Timeout string `json:"timeout"`
- QueryType string `json:"queryType"`
- SQL string `json:"sql"`
- URL string `json:"url"`
- ReuseConnection string `json:"reuseConnection"`
- SSHTunnel map[string]interface{} `json:"sshTunnel,omitempty"`
+ Host string `json:"host"`
+ Port string `json:"port"`
+ Platform string `json:"platform"`
+ Username string `json:"username"`
+ Password string `json:"password"`
+ Database string `json:"database"`
+ Timeout string `json:"timeout"`
+ QueryType string `json:"queryType"`
+ SQL string `json:"sql"`
+ URL string `json:"url"`
+ ReuseConnection string `json:"reuseConnection"`
+ SSHTunnel *SSHTunnel `json:"sshTunnel,omitempty"`
}
// SNMPProtocol represents SNMP protocol configuration
diff --git a/internal/constants/const.go b/internal/constants/const.go
index 6895fcb..d6ac8de 100644
--- a/internal/constants/const.go
+++ b/internal/constants/const.go
@@ -1,49 +1,78 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Package constants defines public common constants,
+// ported from org.apache.hertzbeat.common.constants.CommonConstants
package constants
-// System constants
+// Collect Response status code (Go Collector specific)
const (
- KeyWord = "keyword"
- NullValue = " "
+ CollectSuccess = 0 // Generic success
+ CollectUnavailable = 1 // Service unavailable (e.g., target service
error)
+ CollectUnReachable = 2 // Network unreachable (e.g., network timeout,
DNS fail)
+ CollectUnConnectable = 3 // Connection failed (e.g., port closed, SSH
auth fail)
+ CollectFail = 4 // Generic failure (e.g., query error, script
exec fail)
+ CollectTimeout = 5 // Collection timeout
)
-// Field type constants
+// Field parameter types (Aligned with CommonConstants.java)
const (
- TypeNumber = 0
- TypeString = 1
- TypeTime = 2
+ TYPE_NUMBER = 0 // Field parameter type: number
+ TYPE_STRING = 1 // Field parameter type: String
+ TYPE_SECRET = 2 // Field parameter type: encrypted string
+ TYPE_TIME = 3 // Field parameter type: time
)
-// Service related constants
+// Monitoring status (Aligned with CommonConstants.java)
const (
- MongoDbAtlasModel = "mongodb-atlas"
+ MONITOR_PAUSED_CODE = 0 // 0: Paused
+ MONITOR_UP_CODE = 1 // 1: Up
+ MONITOR_DOWN_CODE = 2 // 2: Down
+)
- PostgreSQLUnReachAbleCode = "08001"
- ZookeeperApp = "zookeeper"
- ZookeeperEnviHeader = "Environment:"
+// Common metric/label keys
+const (
+ // Collection metric value: null placeholder for empty value
+ NULL_VALUE = " "
+
+ // Common metric keys
+ ErrorMsg = "errorMsg"
+ RESPONSE_TIME = "responseTime"
+ StatusCode = "statusCode"
+
+ // Label keys (Aligned with CommonConstants.java)
+ LABEL_INSTANCE = "instance"
+ LABEL_DEFINE_ID = "defineid"
+ LABEL_ALERT_NAME = "alertname"
+ LABEL_INSTANCE_HOST = "instancehost"
+ LABEL_INSTANCE_NAME = "instancename"
+ LABEL_ALERT_SEVERITY = "severity"
)
-// API constants
+// Service specific constants
const (
- ErrorMsg = "errorMsg"
- ResponseTime = "responseTime"
- StatusCode = "statusCode"
+ MongoDbAtlasModel = "mongodb-atlas"
+
+ // PostgreSQLUnReachAbleCode Specific SQLState for connection failure
+ PostgreSQLUnReachAbleCode = "08001"
+ // ZookeeperApp App name for Zookeeper
+ ZookeeperApp = "zookeeper"
+ // ZookeeperEnviHeader Header string in Zookeeper 'envi' command output
+ ZookeeperEnviHeader = "Environment:"
)
// Function related constants
@@ -51,12 +80,11 @@ const (
CollectorModule = "collector"
)
-// Collect Response status code
+// Legacy or alias constants
+// These are kept for compatibility with previous Go code logic
const (
- CollectSuccess = 0
- CollectUnavailable = 1
- CollectUnReachable = 2
- CollectUnConnectable = 3
- CollectFail = 4
- CollectTimeout = 5
+ // FieldTypeString Alias for TYPE_STRING
+ FieldTypeString = TYPE_STRING
+ // KeyWord Deprecated placeholder
+ KeyWord = "keyword"
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]