This is an automated email from the ASF dual-hosted git repository.

shown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git


The following commit(s) were added to refs/heads/main by this push:
     new ed20c5c  feat: enhance JDBC collector with SSH tunnel support and 
security val… (#27)
ed20c5c is described below

commit ed20c5ce69dd38a0405ea8e13b86e665bc7c8bd8
Author: Logic <[email protected]>
AuthorDate: Tue Nov 11 20:05:37 2025 +0800

    feat: enhance JDBC collector with SSH tunnel support and security val… (#27)
---
 go.mod                                             |   1 +
 go.sum                                             |   2 +
 .../collector/basic/database/jdbc_collector.go     | 864 ++++++++++++++++-----
 internal/collector/basic/ssh/ssh_collector.go      |   2 +-
 .../common/collect/dispatch/metrics_collector.go   |  16 +-
 .../collect/dispatch/metrics_collector_test.go     |  96 +--
 .../collector/common/types/job/metrics_types.go    |  24 +-
 internal/constants/const.go                        | 106 ++-
 8 files changed, 814 insertions(+), 297 deletions(-)

diff --git a/go.mod b/go.mod
index 560318f..b924b9a 100644
--- a/go.mod
+++ b/go.mod
@@ -11,6 +11,7 @@ require (
        github.com/lib/pq v1.10.9
        github.com/microsoft/go-mssqldb v1.9.3
        github.com/prometheus/client_golang v1.23.0
+       github.com/sijms/go-ora/v2 v2.9.0
        github.com/spf13/cobra v1.10.1
        github.com/stretchr/testify v1.10.0
        go.uber.org/zap v1.27.0
diff --git a/go.sum b/go.sum
index 2587cf1..b54c994 100644
--- a/go.sum
+++ b/go.sum
@@ -82,6 +82,8 @@ github.com/prometheus/procfs v0.16.1/go.mod 
h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlT
 github.com/rogpeppe/go-internal v1.13.1 
h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
 github.com/rogpeppe/go-internal v1.13.1/go.mod 
h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
 github.com/russross/blackfriday/v2 v2.1.0/go.mod 
h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+github.com/sijms/go-ora/v2 v2.9.0 
h1:+iQbUeTeCOFMb5BsOMgUhV8KWyrv9yjKpcK4x7+MFrg=
+github.com/sijms/go-ora/v2 v2.9.0/go.mod 
h1:QgFInVi3ZWyqAiJwzBQA+nbKYKH77tdp1PYoCqhR2dU=
 github.com/spf13/cobra v1.10.1 h1:lJeBwCfmrnXthfAupyUTzJ/J4Nc1RsHC/mSRU2dll/s=
 github.com/spf13/cobra v1.10.1/go.mod 
h1:7SmJGaTHFVBY0jW4NXGluQoLvhqFQM+6XSKD+P4XaB0=
 github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY=
diff --git a/internal/collector/basic/database/jdbc_collector.go 
b/internal/collector/basic/database/jdbc_collector.go
index 86f01be..f91d79d 100644
--- a/internal/collector/basic/database/jdbc_collector.go
+++ b/internal/collector/basic/database/jdbc_collector.go
@@ -1,33 +1,44 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package database implements JDBC (sql.DB) based database collection.
 package database
 
 import (
        "context"
        "database/sql"
        "fmt"
+       "io"
+       "log"
+       "net"
+       "net/url"
+       "regexp"
        "strconv"
        "strings"
+       "sync"
        "time"
 
        _ "github.com/go-sql-driver/mysql"
        _ "github.com/lib/pq"
+       pq "github.com/lib/pq"
        _ "github.com/microsoft/go-mssqldb"
+       _ "github.com/sijms/go-ora/v2" // Oracle driver
+       "golang.org/x/crypto/ssh"
+
        jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
@@ -49,14 +60,245 @@ const (
        PlatformPostgreSQL = "postgresql"
        PlatformSQLServer  = "sqlserver"
        PlatformOracle     = "oracle"
+       PlatformClickHouse = "clickhouse" // Placeholder, Go driver DSN TBD
+       PlatformDM         = "dm"         // Placeholder, Go driver DSN TBD
 
        // Response codes
        CodeSuccess       = constants.CollectSuccess
        CodeFail          = constants.CollectFail
        CodeUnReachable   = constants.CollectUnReachable
        CodeUnConnectable = constants.CollectUnConnectable
+
+       // PostgreSQL specific SQLState for unreachable
+       // See: 
https://www.postgresql.org/docs/current/errcodes-appendix.html#ERRCODES-TABLE
+       pqUnreachableCode = "08001" // 
SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION
 )
 
+// --- Security Check Constants ---
+// Ported from Java version (JdbcCommonCollect)
+
+var (
+       // VULNERABLE_KEYWORDS Dangerous parameters prohibited in the URL
+       vulnerableKeywords = []string{"allowLoadLocalInfile", 
"allowLoadLocalInfileInPath", "useLocalInfile"}
+
+       // BLACK_LIST Dangerous keywords prohibited in the URL
+       blackList = []string{
+               // Dangerous SQL commands
+               "create trigger", "create alias", "runscript from", "shutdown", 
"drop table",
+               "drop database", "create function", "alter system", "grant 
all", "revoke all",
+
+               // File IO
+               "allowloadlocalinfile", "allowloadlocalinfileinpath", 
"uselocalinfile",
+
+               // Code execution
+               "init=", "javaobjectserializer=", "runscript", 
"serverstatusdiffinterceptor",
+               "queryinterceptors=", "statementinterceptors=", 
"exceptioninterceptors=",
+               "xp_cmdshell", "create function", "dbms_java", "sp_sysexecute", 
"load_file",
+
+               // Multiple statements
+               "allowmultiqueries",
+
+               // Deserialization
+               "autodeserialize", "detectcustomcollations",
+       }
+
+       // universalBypassPatterns Universal bypass patterns (simplified 
version, focusing on whitespace bypass)
+       universalBypassPatterns = []string{
+               `(?i)create\s+trigger`,
+               `(?i)create\s+function`,
+               `(?i)drop\s+table`,
+               `(?i)drop\s+database`,
+               `(?i)run\s+script`,
+               `(?i)alter\s+system`,
+               `(?i)grant\s+all`,
+               `(?i)revoke\s+all`,
+               `(?i)xp\s*cmdshell`,
+               `(?i)load\s+file`,
+       }
+
+       // platformBypassPatterns Platform-specific bypass patterns (simplified 
version)
+       platformBypassPatterns = map[string][]string{
+               "mysql": {
+                       `(?i)allow\s+load\s+local\s+infile`,
+                       `(?i)allow\s+multi\s+queries`,
+                       `(?i)query\s+interceptors`,
+                       `(?i)auto\s+deserialize`,
+               },
+               "mariadb": {
+                       `(?i)allow\s+load\s+local\s+infile`,
+                       `(?i)allow\s+multi\s+queries`,
+                       `(?i)query\s+interceptors`,
+                       `(?i)auto\s+deserialize`,
+               },
+               "postgresql": {
+                       `(?i)socket\s+factory`,
+                       `(?i)logger\s+file`,
+                       `(?i)logger\s+level`,
+               },
+       }
+
+       // Compiled regular expressions
+       compiledUniversalBypassPatterns []*regexp.Regexp
+       compiledPlatformBypassPatterns  map[string][]*regexp.Regexp
+
+       // Used to clean invalid characters from URL
+       invalidCharRegex = regexp.MustCompile(`[\x00-\x1F\x7F\xA0]`)
+
+       // Ensures regex patterns are compiled only once
+       compilePatternsOnce sync.Once
+)
+
+// initPatterns compiles all security check regular expressions
+func initPatterns() {
+       compiledUniversalBypassPatterns = make([]*regexp.Regexp, 
len(universalBypassPatterns))
+       for i, pattern := range universalBypassPatterns {
+               compiled, err := regexp.Compile(pattern)
+               if err != nil {
+                       log.Fatalf("Failed to compile universal regex pattern 
%q: %v", pattern, err)
+               }
+               compiledUniversalBypassPatterns[i] = compiled
+       }
+
+       compiledPlatformBypassPatterns = make(map[string][]*regexp.Regexp)
+       for platform, patterns := range platformBypassPatterns {
+               compiledPlatformBypassPatterns[platform] = 
make([]*regexp.Regexp, len(patterns))
+               for i, pattern := range patterns {
+                       compiled, err := regexp.Compile(pattern)
+                       if err != nil {
+                               log.Fatalf("Failed to compile platform regex 
pattern %q for %s: %v", pattern, platform, err)
+                       }
+                       compiledPlatformBypassPatterns[platform][i] = compiled
+               }
+       }
+}
+
+// --- SSH Tunnel ---
+
+// sshTunnelHelper manages an SSH tunnel connection
+type sshTunnelHelper struct {
+       client   *ssh.Client
+       listener net.Listener
+       logger   logger.Logger
+}
+
+// startSSHTunnel starts an SSH tunnel
+// It connects to the SSH bastion, listens on a local random port,
+// and forwards traffic to the target database.
+func startSSHTunnel(config *jobtypes.SSHTunnel, remoteHost, remotePort string, 
timeout time.Duration, log logger.Logger) (*sshTunnelHelper, string, string, 
error) {
+       sshHost := config.Host
+       sshPort, err := strconv.Atoi(config.Port)
+       if err != nil {
+               return nil, "", "", fmt.Errorf("invalid SSH port: %v", err)
+       }
+       sshUser := config.Username
+       sshPassword := config.Password
+
+       // 1. Set up SSH client configuration
+       sshConfig := &ssh.ClientConfig{
+               User: sshUser,
+               Auth: []ssh.AuthMethod{
+                       ssh.Password(sshPassword),
+               },
+               HostKeyCallback: ssh.InsecureIgnoreHostKey(), // Ignore host 
key validation
+               Timeout:         timeout,
+       }
+
+       sshAddr := fmt.Sprintf("%s:%d", sshHost, sshPort)
+       log.V(1).Info("starting SSH tunnel", "sshAddr", sshAddr, "remoteHost", 
remoteHost, "remotePort", remotePort)
+
+       // 2. Connect to the SSH bastion
+       client, err := ssh.Dial("tcp", sshAddr, sshConfig)
+       if err != nil {
+               return nil, "", "", fmt.Errorf("failed to dial SSH server: %w", 
err)
+       }
+
+       // 3. Listen on a random local port on 127.0.0.1
+       listener, err := net.Listen("tcp", "127.0.0.1:0")
+       if err != nil {
+               client.Close()
+               return nil, "", "", fmt.Errorf("failed to listen on local port: 
%w", err)
+       }
+
+       localAddr := listener.Addr().String()
+       localHost, localPort, err := net.SplitHostPort(localAddr)
+       if err != nil {
+               client.Close()
+               listener.Close()
+               return nil, "", "", fmt.Errorf("failed to parse local address: 
%w", err)
+       }
+
+       helper := &sshTunnelHelper{
+               client:   client,
+               listener: listener,
+               logger:   log,
+       }
+
+       // 4. Start a goroutine to accept local connections and forward them
+       go helper.runForwarder(remoteHost, remotePort)
+
+       log.V(1).Info("SSH tunnel started", "localAddr", localAddr, "remote", 
fmt.Sprintf("%s:%s", remoteHost, remotePort))
+       return helper, localHost, localPort, nil
+}
+
+// runForwarder loops to accept and forward local connections
+func (t *sshTunnelHelper) runForwarder(remoteHost, remotePort string) {
+       remoteAddr := fmt.Sprintf("%s:%s", remoteHost, remotePort)
+       for {
+               localConn, err := t.listener.Accept()
+               if err != nil {
+                       // If the listener is closed, exit the loop
+                       if strings.Contains(err.Error(), "use of closed network 
connection") {
+                               break
+                       }
+                       t.logger.Error(err, "failed to accept local connection")
+                       continue
+               }
+
+               // Start a goroutine to handle forwarding
+               go t.forwardConnection(localConn, remoteAddr)
+       }
+}
+
+// forwardConnection handles forwarding for a single connection
+func (t *sshTunnelHelper) forwardConnection(localConn net.Conn, remoteAddr 
string) {
+       defer localConn.Close()
+
+       // 3. Connect to the remote database via the SSH tunnel
+       remoteConn, err := t.client.Dial("tcp", remoteAddr)
+       if err != nil {
+               t.logger.Error(err, "failed to dial remote address via SSH", 
"remoteAddr", remoteAddr)
+               return
+       }
+       defer remoteConn.Close()
+
+       // 4. Bidirectionally copy data
+       go func() {
+               _, err := io.Copy(remoteConn, localConn)
+               if err != nil && err != io.EOF {
+                       // t.logger.V(1).Error(err, "error copying local to 
remote")
+               }
+               remoteConn.Close()
+               localConn.Close()
+       }()
+       go func() {
+               _, err := io.Copy(localConn, remoteConn)
+               if err != nil && err != io.EOF {
+                       // t.logger.V(1).Error(err, "error copying remote to 
local")
+               }
+               remoteConn.Close()
+               localConn.Close()
+       }()
+}
+
+// Close shuts down the SSH tunnel
+func (t *sshTunnelHelper) Close() {
+       t.listener.Close()
+       t.client.Close()
+       t.logger.V(1).Info("SSH tunnel closed")
+}
+
+// --- JDBC Collector ---
+
 // JDBCCollector implements JDBC database collection
 type JDBCCollector struct {
        logger logger.Logger
@@ -64,13 +306,14 @@ type JDBCCollector struct {
 
 // NewJDBCCollector creates a new JDBC collector
 func NewJDBCCollector(logger logger.Logger) *JDBCCollector {
+       // Ensure security patterns are compiled only once
+       compilePatternsOnce.Do(initPatterns)
        return &JDBCCollector{
                logger: logger.WithName("jdbc-collector"),
        }
 }
 
 // extractJDBCConfig extracts JDBC configuration from interface{} type
-// This function uses the parameter replacer for consistent configuration 
extraction
 func extractJDBCConfig(jdbcInterface interface{}) (*jobtypes.JDBCProtocol, 
error) {
        replacer := param.NewReplacer()
        return replacer.ExtractJDBCConfig(jdbcInterface)
@@ -81,7 +324,6 @@ func (jc *JDBCCollector) PreCheck(metrics *jobtypes.Metrics) 
error {
        if metrics == nil {
                return fmt.Errorf("metrics is nil")
        }
-
        if metrics.JDBC == nil {
                return fmt.Errorf("JDBC protocol configuration is required")
        }
@@ -95,8 +337,19 @@ func (jc *JDBCCollector) PreCheck(metrics 
*jobtypes.Metrics) error {
                return fmt.Errorf("JDBC configuration is required")
        }
 
-       // Validate required fields when URL is not provided
-       if jdbcConfig.URL == "" {
+       // 1. Validate SSH tunnel configuration
+       if err := jc.checkTunnelParam(jdbcConfig.SSHTunnel); err != nil {
+               return err
+       }
+
+       // 2. Validate URL
+       if jdbcConfig.URL != "" {
+               // If a full URL is provided, perform strict security checks
+               if err := validateURL(jdbcConfig.URL, jdbcConfig.Platform); err 
!= nil {
+                       return err
+               }
+       } else {
+               // If URL is not provided, check basic host/port/platform
                if jdbcConfig.Host == "" {
                        return fmt.Errorf("host is required when URL is not 
provided")
                }
@@ -108,99 +361,140 @@ func (jc *JDBCCollector) PreCheck(metrics 
*jobtypes.Metrics) error {
                }
        }
 
-       // Validate platform
+       // 3. Validate platform (Platform can be empty if URL is provided,
+       //    but required if URL needs to be auto-built)
        if jdbcConfig.Platform != "" {
                switch jdbcConfig.Platform {
                case PlatformMySQL, PlatformMariaDB, PlatformPostgreSQL, 
PlatformSQLServer, PlatformOracle:
                        // Valid platforms
+               case PlatformClickHouse, PlatformDM:
+                       // Platforms not yet fully supported
+                       jc.logger.Info("warning: platform %s is not fully 
supported in Go collector yet", jdbcConfig.Platform)
                default:
                        return fmt.Errorf("unsupported database platform: %s", 
jdbcConfig.Platform)
                }
        }
 
-       // Validate query type
-       if jdbcConfig.QueryType != "" {
-               switch jdbcConfig.QueryType {
-               case QueryTypeOneRow, QueryTypeMultiRow, QueryTypeColumns, 
QueryTypeRunScript:
-                       // Valid query types
-               default:
-                       return fmt.Errorf("unsupported query type: %s", 
jdbcConfig.QueryType)
-               }
+       // 4. Validate query type
+       if jdbcConfig.QueryType == "" {
+               // Default to oneRow
+               jdbcConfig.QueryType = QueryTypeOneRow
+       }
+       switch jdbcConfig.QueryType {
+       case QueryTypeOneRow, QueryTypeMultiRow, QueryTypeColumns, 
QueryTypeRunScript:
+               // Valid query types
+       default:
+               return fmt.Errorf("unsupported query type: %s", 
jdbcConfig.QueryType)
        }
 
-       // Validate SQL for most query types
-       if jdbcConfig.QueryType != QueryTypeRunScript && jdbcConfig.SQL == "" {
-               return fmt.Errorf("SQL is required for query type: %s", 
jdbcConfig.QueryType)
+       // 5. Validate SQL (The SQL field for runScript contains the script 
content and must also be checked)
+       if jdbcConfig.SQL == "" {
+               return fmt.Errorf("SQL/Script content is required for query 
type: %s", jdbcConfig.QueryType)
        }
 
        return nil
 }
 
+// checkTunnelParam validates SSH tunnel configuration
+func (jc *JDBCCollector) checkTunnelParam(config *jobtypes.SSHTunnel) error {
+       if config == nil {
+               return nil
+       }
+       enable, _ := strconv.ParseBool(config.Enable)
+       if !enable {
+               return nil
+       }
+       if config.Host == "" || config.Port == "" || config.Username == "" {
+               return fmt.Errorf("ssh tunnel host, port, and username are 
required when enabled")
+       }
+       return nil
+}
+
 // Collect performs JDBC metrics collection
 func (jc *JDBCCollector) Collect(metrics *jobtypes.Metrics) 
*jobtypes.CollectRepMetricsData {
        startTime := time.Now()
 
-       // Extract JDBC configuration
+       // 1. Extract configuration
        jdbcConfig, err := extractJDBCConfig(metrics.JDBC)
        if err != nil {
                jc.logger.Error(err, "failed to extract JDBC config")
                return jc.createFailResponse(metrics, CodeFail, 
fmt.Sprintf("JDBC config error: %v", err))
        }
-       if jdbcConfig == nil {
-               return jc.createFailResponse(metrics, CodeFail, "JDBC 
configuration is required")
-       }
-
-       // Debug level only for collection start
-       jc.logger.V(1).Info("starting JDBC collection",
-               "host", jdbcConfig.Host,
-               "platform", jdbcConfig.Platform,
-               "queryType", jdbcConfig.QueryType)
 
-       // Get timeout
+       // 2. Get timeout
        timeout := jc.getTimeout(jdbcConfig.Timeout)
+       ctx, cancel := context.WithTimeout(context.Background(), timeout)
+       defer cancel()
 
-       // Get database URL
-       databaseURL, err := jc.constructDatabaseURL(jdbcConfig)
+       // 3. Set up SSH tunnel (if needed)
+       localHost := jdbcConfig.Host
+       localPort := jdbcConfig.Port
+
+       if jdbcConfig.SSHTunnel != nil {
+               enableTunnel, _ := 
strconv.ParseBool(jdbcConfig.SSHTunnel.Enable)
+               if enableTunnel {
+                       tunnel, lHost, lPort, err := 
startSSHTunnel(jdbcConfig.SSHTunnel, jdbcConfig.Host, jdbcConfig.Port, timeout, 
jc.logger)
+                       if err != nil {
+                               jc.logger.Error(err, "failed to start SSH 
tunnel")
+                               return jc.createFailResponse(metrics, 
CodeUnConnectable, fmt.Sprintf("SSH tunnel error: %v", err))
+                       }
+                       defer tunnel.Close()
+                       localHost = lHost
+                       localPort = lPort
+               }
+       }
+
+       // 4. Construct database URL
+       databaseURL, err := jc.constructDatabaseURL(jdbcConfig, localHost, 
localPort)
        if err != nil {
                jc.logger.Error(err, "failed to construct database URL")
                return jc.createFailResponse(metrics, CodeFail, 
fmt.Sprintf("Database URL error: %v", err))
        }
 
-       // Create database connection
-       db, err := jc.getConnection(databaseURL, jdbcConfig.Username, 
jdbcConfig.Password, timeout)
+       // 5. Create database connection
+       db, err := jc.getConnection(ctx, databaseURL, jdbcConfig.Platform, 
timeout)
        if err != nil {
                jc.logger.Error(err, "failed to connect to database")
+               // Ping failed inside getConnection, return UnConnectable
                return jc.createFailResponse(metrics, CodeUnConnectable, 
fmt.Sprintf("Connection error: %v", err))
        }
        defer db.Close()
 
-       // Execute query based on type with context
+       // 6. Execute query
        response := jc.createSuccessResponse(metrics)
-
        switch jdbcConfig.QueryType {
        case QueryTypeOneRow:
-               err = jc.queryOneRow(db, jdbcConfig.SQL, metrics.AliasFields, 
response)
+               err = jc.queryOneRow(ctx, db, jdbcConfig.SQL, 
metrics.AliasFields, response)
        case QueryTypeMultiRow:
-               err = jc.queryMultiRow(db, jdbcConfig.SQL, metrics.AliasFields, 
response)
+               err = jc.queryMultiRow(ctx, db, jdbcConfig.SQL, 
metrics.AliasFields, response)
        case QueryTypeColumns:
-               err = jc.queryColumns(db, jdbcConfig.SQL, metrics.AliasFields, 
response)
+               err = jc.queryColumns(ctx, db, jdbcConfig.SQL, 
metrics.AliasFields, response)
        case QueryTypeRunScript:
-               err = jc.runScript(db, jdbcConfig.SQL, response)
+               err = jc.runScript(ctx, db, jdbcConfig.SQL, response)
        default:
                err = fmt.Errorf("unsupported query type: %s", 
jdbcConfig.QueryType)
        }
 
+       // 7. Process query results
+       duration := time.Since(startTime)
        if err != nil {
                jc.logger.Error(err, "query execution failed", "queryType", 
jdbcConfig.QueryType)
+               // Check for specific errors
+               if pqErr, ok := err.(*pq.Error); ok {
+                       if pqErr.Code == pqUnreachableCode {
+                               return jc.createFailResponse(metrics, 
CodeUnReachable, fmt.Sprintf("Query error: %v (Code: %s)", err, pqErr.Code))
+                       }
+               }
+               if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
+                       return jc.createFailResponse(metrics, CodeUnReachable, 
fmt.Sprintf("Query timeout: %v", err))
+               }
                return jc.createFailResponse(metrics, CodeFail, 
fmt.Sprintf("Query error: %v", err))
        }
 
-       duration := time.Since(startTime)
-       // Debug level only for successful completion
-       jc.logger.V(1).Info("JDBC collection completed",
-               "duration", duration,
-               "rowCount", len(response.Values))
+       // Inject response time (if response_time field exists)
+       jc.addResponseTime(response, metrics.AliasFields, duration)
 
+       jc.logger.V(1).Info("JDBC collection completed", "duration", duration, 
"rowCount", len(response.Values))
        return response
 }
 
@@ -215,11 +509,12 @@ func (jc *JDBCCollector) getTimeout(timeoutStr string) 
time.Duration {
                return 30 * time.Second // default timeout
        }
 
-       // First try parsing as duration string (e.g., "10s", "5m", "500ms")
+       // Try parsing as Go duration string (e.g., "10s", "5m", "500ms")
        if duration, err := time.ParseDuration(timeoutStr); err == nil {
                return duration
        }
 
+       // Try parsing as Java version's milliseconds
        if timeout, err := strconv.Atoi(timeoutStr); err == nil {
                return time.Duration(timeout) * time.Millisecond
        }
@@ -227,61 +522,171 @@ func (jc *JDBCCollector) getTimeout(timeoutStr string) 
time.Duration {
        return 30 * time.Second // fallback to default
 }
 
-// constructDatabaseURL constructs the database connection URL
-func (jc *JDBCCollector) constructDatabaseURL(jdbc *jobtypes.JDBCProtocol) 
(string, error) {
+// recursiveDecode recursively decodes a URL (max 5 times)
+func recursiveDecode(s string) (string, error) {
+       prev := s
+       for i := 0; i < 5; i++ {
+               decoded, err := url.QueryUnescape(prev)
+               if err != nil {
+                       // If decoding fails, return the last successfully 
decoded version
+                       return prev, err
+               }
+               if decoded == prev {
+                       // If no change after decoding, decoding is complete
+                       return decoded, nil
+               }
+               prev = decoded
+       }
+       return prev, nil
+}
+
+// validateURL performs strict URL security validation
+func validateURL(rawURL string, platform string) error {
+       // 1. Length limit
+       if len(rawURL) > 2048 {
+               return fmt.Errorf("JDBC URL length exceeds maximum limit of 
2048 characters")
+       }
+
+       // 2. Clean invisible characters
+       cleanedUrl := invalidCharRegex.ReplaceAllString(rawURL, "")
+
+       // 3. Recursive decoding
+       decodedUrl, err := recursiveDecode(cleanedUrl)
+       if err != nil {
+               // We can't use jc.logger here as it's a static function
+               // log.Printf("URL decoding error: %v", err)
+               // Continue checking with the pre-decoded string even if 
decoding fails
+               decodedUrl = cleanedUrl
+       }
+
+       urlLowerCase := strings.ToLower(decodedUrl)
+
+       // 4. Check VULNERABLE_KEYWORDS
+       for _, keyword := range vulnerableKeywords {
+               if strings.Contains(urlLowerCase, keyword) {
+                       return fmt.Errorf("JDBC URL prohibit contains 
vulnerable param %s", keyword)
+               }
+       }
+
+       // 5. Check BLACK_LIST
+       for _, keyword := range blackList {
+               if strings.Contains(urlLowerCase, keyword) {
+                       return fmt.Errorf("invalid JDBC URL: contains 
potentially malicious parameter: %s", keyword)
+               }
+       }
+
+       // 6. Check JNDI/Deserialization (simplified)
+       if strings.Contains(urlLowerCase, "jndi:") || 
strings.Contains(urlLowerCase, "ldap:") || strings.Contains(urlLowerCase, 
"rmi:") {
+               return fmt.Errorf("invalid JDBC URL: contains potentially 
malicious JNDI or deserialization parameter")
+       }
+
+       // 7. Check universal bypass patterns
+       for _, pattern := range compiledUniversalBypassPatterns {
+               if pattern.MatchString(urlLowerCase) {
+                       return fmt.Errorf("invalid JDBC URL: contains 
potentially malicious bypass pattern: %s", pattern.String())
+               }
+       }
+
+       // 8. Check platform-specific bypass patterns
+       if platform != "" {
+               if patterns, ok := 
compiledPlatformBypassPatterns[strings.ToLower(platform)]; ok {
+                       for _, pattern := range patterns {
+                               if pattern.MatchString(urlLowerCase) {
+                                       return fmt.Errorf("invalid %s JDBC URL: 
contains potentially malicious bypass pattern: %s", platform, pattern.String())
+                               }
+                       }
+               }
+       }
+
+       return nil
+}
+
+// constructDatabaseURL constructs the database connection URL/DSN
+func (jc *JDBCCollector) constructDatabaseURL(jdbc *jobtypes.JDBCProtocol, 
host, port string) (string, error) {
+       // 1. If user provided a full URL, use it (already validated in 
PreCheck)
+       if jdbc.URL != "" {
+               // Validate again to prevent bypassing PreCheck
+               if err := validateURL(jdbc.URL, jdbc.Platform); err != nil {
+                       return "", err
+               }
+               return jdbc.URL, nil
+       }
 
-       // Construct URL based on platform
-       host := jdbc.Host
-       port := jdbc.Port
+       // 2. Auto-build DSN based on platform
        database := jdbc.Database
+       username := jdbc.Username
+       password := jdbc.Password
 
        switch jdbc.Platform {
        case PlatformMySQL, PlatformMariaDB:
-               // MySQL DSN format: 
[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
-               return 
fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?parseTime=true&charset=utf8mb4",
-                       jdbc.Username, jdbc.Password, host, port, database), nil
+               // MySQL DSN: 
[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
+               dsn := 
fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?parseTime=true&charset=utf8mb4&timeout=30s&readTimeout=30s&writeTimeout=30s",
+                       username, password, host, port, database)
+               // runScript requires multiStatements support
+               if jdbc.QueryType == QueryTypeRunScript {
+                       dsn += "&multiStatements=true"
+               }
+               return dsn, nil
        case PlatformPostgreSQL:
-               return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable",
-                       jdbc.Username, jdbc.Password, host, port, database), nil
+               // PostgreSQL DSN: 
postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]
+               return 
fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable&connect_timeout=30",
+                       username, password, host, port, database), nil
        case PlatformSQLServer:
-               return 
fmt.Sprintf("sqlserver://%s:%s@%s:%s?database=%s&trustServerCertificate=true",
-                       jdbc.Username, jdbc.Password, host, port, database), nil
+               // SQL Server DSN: 
sqlserver://username:password@host:port?database=dbname&...
+               return 
fmt.Sprintf("sqlserver://%s:%s@%s:%s?database=%s&trustServerCertificate=true&encrypt=disable&connection+timeout=30",
+                       username, password, host, port, database), nil
+       case PlatformOracle:
+               // go-ora DSN: oracle://user:pass@host:port/service_name
+               // Note: Oracle's 'database' field usually corresponds to 
'Service Name'
+               return fmt.Sprintf("oracle://%s:%s@%s:%s/%s",
+                       username, password, host, port, database), nil
        default:
-               return "", fmt.Errorf("unsupported database platform: %s", 
jdbc.Platform)
+               return "", fmt.Errorf("unsupported database platform for URL 
construction: %s", jdbc.Platform)
        }
 }
 
 // getConnection creates a database connection with timeout
-func (jc *JDBCCollector) getConnection(databaseURL, username, password string, 
timeout time.Duration) (*sql.DB, error) {
-       // Extract driver name from URL
+func (jc *JDBCCollector) getConnection(ctx context.Context, databaseURL 
string, platform string, timeout time.Duration) (*sql.DB, error) {
+       // 1. Select driver based on platform
        var driverName string
-       if strings.HasPrefix(databaseURL, "postgres://") {
+       switch platform {
+       case PlatformMySQL, PlatformMariaDB:
+               driverName = "mysql"
+       case PlatformPostgreSQL:
                driverName = "postgres"
-       } else if strings.HasPrefix(databaseURL, "sqlserver://") {
+       case PlatformSQLServer:
                driverName = "sqlserver"
-       } else if strings.Contains(databaseURL, "@tcp(") {
-               // MySQL DSN format (no protocol prefix)
-               driverName = "mysql"
-       } else {
-               return nil, fmt.Errorf("unsupported database URL format: %s", 
databaseURL)
+       case PlatformOracle:
+               driverName = "oracle"
+       default:
+               // Try to infer from URL (for user-customized URLs)
+               if strings.HasPrefix(databaseURL, "postgres:") {
+                       driverName = "postgres"
+               } else if strings.HasPrefix(databaseURL, "sqlserver:") {
+                       driverName = "sqlserver"
+               } else if strings.HasPrefix(databaseURL, "oracle:") {
+                       driverName = "oracle"
+               } else if strings.Contains(databaseURL, "@tcp(") { // MySQL DSN 
format
+                       driverName = "mysql"
+               } else {
+                       return nil, fmt.Errorf("unsupported platform or 
ambiguous URL: %s", platform)
+               }
        }
 
-       // Open database connection
+       // 2. Open database connection
        db, err := sql.Open(driverName, databaseURL)
        if err != nil {
                return nil, fmt.Errorf("failed to open database: %w", err)
        }
 
-       // Set connection timeout and pool settings
+       // 3. Set connection pool parameters
        db.SetConnMaxLifetime(timeout)
-       db.SetMaxOpenConns(5)                   // Allow more concurrent 
connections
-       db.SetMaxIdleConns(2)                   // Keep some idle connections
-       db.SetConnMaxIdleTime(30 * time.Second) // Idle connection timeout
-
-       // Test connection with timeout context
-       ctx, cancel := context.WithTimeout(context.Background(), timeout)
-       defer cancel()
+       db.SetMaxOpenConns(5)
+       db.SetMaxIdleConns(2)
+       db.SetConnMaxIdleTime(30 * time.Second)
 
+       // 4. Test connection (Ping)
+       // Use the incoming context which has the overall timeout
        if err := db.PingContext(ctx); err != nil {
                db.Close()
                return nil, fmt.Errorf("failed to ping database: %w", err)
@@ -290,9 +695,41 @@ func (jc *JDBCCollector) getConnection(databaseURL, 
username, password string, t
        return db, nil
 }
 
+// convertRowValues converts raw database values (interface{}) to a string 
slice
+// Deprecated: Using sql.NullString is preferred.
+func (jc *JDBCCollector) convertRowValues(rawValues []interface{}, columns 
[]sql.NullString) []string {
+       stringValues := make([]string, len(rawValues))
+       for i, rawValue := range rawValues {
+               if rawValue == nil {
+                       stringValues[i] = constants.NULL_VALUE // Use "NULL" 
defined in Java version
+                       continue
+               }
+
+               // Try using NullString (although Scan already handled it)
+               if columns[i].Valid {
+                       stringValues[i] = columns[i].String
+               } else if rawValue != nil {
+                       // Fallback to fmt.Sprintf
+                       switch v := rawValue.(type) {
+                       case []byte:
+                               stringValues[i] = string(v) // Handle byte 
arrays
+                       case time.Time:
+                               stringValues[i] = v.Format(time.RFC3339) // 
Unify time format
+                       default:
+                               stringValues[i] = fmt.Sprintf("%v", v)
+                       }
+               } else {
+                       stringValues[i] = constants.NULL_VALUE
+               }
+       }
+       return stringValues
+}
+
 // queryOneRow executes a query and returns only the first row
-func (jc *JDBCCollector) queryOneRow(db *sql.DB, sqlQuery string, aliasFields 
[]string, response *jobtypes.CollectRepMetricsData) error {
-       rows, err := db.Query(sqlQuery)
+func (jc *JDBCCollector) queryOneRow(ctx context.Context, db *sql.DB, sqlQuery 
string, aliasFields []string, response *jobtypes.CollectRepMetricsData) error {
+       // Java version uses setMaxRows(1), which Go's standard library doesn't 
support.
+       // We simulate this by only reading the first row.
+       rows, err := db.QueryContext(ctx, sqlQuery)
        if err != nil {
                return fmt.Errorf("query execution failed: %w", err)
        }
@@ -304,37 +741,29 @@ func (jc *JDBCCollector) queryOneRow(db *sql.DB, sqlQuery 
string, aliasFields []
                return fmt.Errorf("failed to get columns: %w", err)
        }
 
-       // Build fields
-       for i, column := range columns {
-               field := jobtypes.Field{
-                       Field:    column,
-                       Type:     1, // String type
-                       Label:    false,
-                       Unit:     "",
-                       Instance: i == 0,
-               }
-               response.Fields = append(response.Fields, field)
-       }
+       // Build fields (using aliasFields or original column names)
+       fieldNames := jc.buildFields(response, aliasFields, columns)
 
-       // Get first row
+       // Get the first row
        if rows.Next() {
-               values := make([]interface{}, len(columns))
-               valuePtrs := make([]interface{}, len(columns))
+               // Use sql.NullString to handle NULL values correctly
+               values := make([]interface{}, len(fieldNames))
+               scanValues := make([]sql.NullString, len(fieldNames))
                for i := range values {
-                       valuePtrs[i] = &values[i]
+                       values[i] = &scanValues[i]
                }
 
-               if err := rows.Scan(valuePtrs...); err != nil {
+               if err := rows.Scan(values...); err != nil {
                        return fmt.Errorf("failed to scan row: %w", err)
                }
 
-               // Convert to string values
-               stringValues := make([]string, len(values))
-               for i, v := range values {
-                       if v != nil {
-                               stringValues[i] = fmt.Sprintf("%v", v)
+               // Convert
+               stringValues := make([]string, len(fieldNames))
+               for i, sv := range scanValues {
+                       if sv.Valid {
+                               stringValues[i] = sv.String
                        } else {
-                               stringValues[i] = ""
+                               stringValues[i] = constants.NULL_VALUE
                        }
                }
 
@@ -343,12 +772,12 @@ func (jc *JDBCCollector) queryOneRow(db *sql.DB, sqlQuery 
string, aliasFields []
                })
        }
 
-       return nil
+       return rows.Err()
 }
 
 // queryMultiRow executes a query and returns multiple rows
-func (jc *JDBCCollector) queryMultiRow(db *sql.DB, sqlQuery string, 
aliasFields []string, response *jobtypes.CollectRepMetricsData) error {
-       rows, err := db.Query(sqlQuery)
+func (jc *JDBCCollector) queryMultiRow(ctx context.Context, db *sql.DB, 
sqlQuery string, aliasFields []string, response 
*jobtypes.CollectRepMetricsData) error {
+       rows, err := db.QueryContext(ctx, sqlQuery)
        if err != nil {
                return fmt.Errorf("query execution failed: %w", err)
        }
@@ -361,36 +790,27 @@ func (jc *JDBCCollector) queryMultiRow(db *sql.DB, 
sqlQuery string, aliasFields
        }
 
        // Build fields
-       for i, column := range columns {
-               field := jobtypes.Field{
-                       Field:    column,
-                       Type:     1, // String type
-                       Label:    false,
-                       Unit:     "",
-                       Instance: i == 0,
-               }
-               response.Fields = append(response.Fields, field)
-       }
+       fieldNames := jc.buildFields(response, aliasFields, columns)
 
        // Get all rows
        for rows.Next() {
-               values := make([]interface{}, len(columns))
-               valuePtrs := make([]interface{}, len(columns))
+               values := make([]interface{}, len(fieldNames))
+               scanValues := make([]sql.NullString, len(fieldNames))
                for i := range values {
-                       valuePtrs[i] = &values[i]
+                       values[i] = &scanValues[i]
                }
 
-               if err := rows.Scan(valuePtrs...); err != nil {
+               if err := rows.Scan(values...); err != nil {
                        return fmt.Errorf("failed to scan row: %w", err)
                }
 
-               // Convert to string values
-               stringValues := make([]string, len(values))
-               for i, v := range values {
-                       if v != nil {
-                               stringValues[i] = fmt.Sprintf("%v", v)
+               // Convert
+               stringValues := make([]string, len(fieldNames))
+               for i, sv := range scanValues {
+                       if sv.Valid {
+                               stringValues[i] = sv.String
                        } else {
-                               stringValues[i] = ""
+                               stringValues[i] = constants.NULL_VALUE
                        }
                }
 
@@ -403,92 +823,158 @@ func (jc *JDBCCollector) queryMultiRow(db *sql.DB, 
sqlQuery string, aliasFields
 }
 
 // queryColumns executes a query and matches two columns (similar to the Java 
implementation)
-func (jc *JDBCCollector) queryColumns(db *sql.DB, sqlQuery string, aliasFields 
[]string, response *jobtypes.CollectRepMetricsData) error {
-       rows, err := db.Query(sqlQuery)
+func (jc *JDBCCollector) queryColumns(ctx context.Context, db *sql.DB, 
sqlQuery string, aliasFields []string, response 
*jobtypes.CollectRepMetricsData) error {
+       rows, err := db.QueryContext(ctx, sqlQuery)
        if err != nil {
                return fmt.Errorf("query execution failed: %w", err)
        }
        defer rows.Close()
 
-       // Get column names
+       // Get column names (ensure at least two columns)
        columns, err := rows.Columns()
        if err != nil {
                return fmt.Errorf("failed to get columns: %w", err)
        }
-
        if len(columns) < 2 {
                return fmt.Errorf("columns query type requires at least 2 
columns, got %d", len(columns))
        }
 
-       // Build fields from alias fields or default columns
-       var fieldNames []string
-       if len(aliasFields) > 0 {
-               fieldNames = aliasFields
-       } else {
-               fieldNames = columns
+       // Build fields (must use aliasFields)
+       if len(aliasFields) == 0 {
+               return fmt.Errorf("columns query type requires aliasFields to 
be defined")
        }
+       fieldNames := jc.buildFields(response, aliasFields, nil)
 
-       for i, fieldName := range fieldNames {
-               field := jobtypes.Field{
-                       Field:    fieldName,
-                       Type:     1, // String type
-                       Label:    false,
-                       Unit:     "",
-                       Instance: i == 0,
-               }
-               response.Fields = append(response.Fields, field)
-       }
-
-       // Create value row with column mappings
-       valueRow := jobtypes.ValueRow{
-               Columns: make([]string, len(fieldNames)),
-       }
-
-       // Process rows to build key-value mapping
+       // Create K-V map
        keyValueMap := make(map[string]string)
        for rows.Next() {
-               values := make([]interface{}, len(columns))
-               valuePtrs := make([]interface{}, len(columns))
-               for i := range values {
-                       valuePtrs[i] = &values[i]
+               var key, value sql.NullString
+
+               // Scan only the first two columns
+               // Handle varying number of columns in result set gracefully
+               scanArgs := make([]interface{}, len(columns))
+               scanArgs[0] = &key
+               scanArgs[1] = &value
+               for i := 2; i < len(columns); i++ {
+                       scanArgs[i] = new(sql.RawBytes) // Discard extra columns
                }
 
-               if err := rows.Scan(valuePtrs...); err != nil {
-                       return fmt.Errorf("failed to scan row: %w", err)
+               if err := rows.Scan(scanArgs...); err != nil {
+                       // If scan fails (e.g., column mismatch), log warning 
and continue
+                       jc.logger.Error(err, "failed to scan row in columns 
query")
+                       continue
                }
 
-               // Use first column as key, second as value
-               key := ""
-               value := ""
-               if values[0] != nil {
-                       key = fmt.Sprintf("%v", values[0])
-               }
-               if len(values) > 1 && values[1] != nil {
-                       value = fmt.Sprintf("%v", values[1])
+               if key.Valid {
+                       // Corresponds to Java: 
values.put(resultSet.getString(1).toLowerCase().trim(), resultSet.getString(2));
+                       keyString := 
strings.ToLower(strings.TrimSpace(key.String))
+                       if value.Valid {
+                               keyValueMap[keyString] = value.String
+                       } else {
+                               keyValueMap[keyString] = constants.NULL_VALUE
+                       }
                }
-               keyValueMap[key] = value
+       }
+       if err := rows.Err(); err != nil {
+               return err
        }
 
-       // Map values to alias fields
+       // Map to the result row
+       valueRow := jobtypes.ValueRow{
+               Columns: make([]string, len(fieldNames)),
+       }
        for i, fieldName := range fieldNames {
-               if value, exists := keyValueMap[fieldName]; exists {
+               // Corresponds to Java: String value = 
values.get(column.toLowerCase());
+               if value, exists := keyValueMap[strings.ToLower(fieldName)]; 
exists {
                        valueRow.Columns[i] = value
                } else {
-                       valueRow.Columns[i] = ""
+                       // Check if it's response_time
+                       if fieldName == constants.RESPONSE_TIME {
+                               // This value will be populated at the end of 
Collect by addResponseTime
+                               valueRow.Columns[i] = "0"
+                       } else {
+                               valueRow.Columns[i] = constants.NULL_VALUE
+                       }
                }
        }
 
        response.Values = append(response.Values, valueRow)
-       return rows.Err()
+       return nil
 }
 
-// runScript executes a SQL script (placeholder implementation)
-func (jc *JDBCCollector) runScript(db *sql.DB, scriptPath string, response 
*jobtypes.CollectRepMetricsData) error {
-       // For security reasons, we'll just return success without actually 
running scripts
-       jc.logger.Info("script execution requested but disabled for security", 
"scriptPath", scriptPath)
+// runScript executes a SQL script (as content, not file path)
+func (jc *JDBCCollector) runScript(ctx context.Context, db *sql.DB, 
scriptContent string, response *jobtypes.CollectRepMetricsData) error {
+       // The Java version uses ScriptUtils to execute a file; here we execute 
the content from the SQL field.
+       // Note: DSN must have multiStatements=true enabled (e.g., for MySQL).
+       result, err := db.ExecContext(ctx, scriptContent)
+       if err != nil {
+               return fmt.Errorf("script execution failed: %w", err)
+       }
+
+       // Script execution usually doesn't return rows, but we can return the 
affected rows.
+       // We define a "rows_affected" field.
+       jc.buildFields(response, nil, []string{"rows_affected"})
+       rowsAffected, err := result.RowsAffected()
+       if err != nil {
+               // Some drivers (like PostgreSQL) might not support 
RowsAffected for scripts
+               jc.logger.V(1).Info("could not get rows affected from script 
execution", "error", err)
+               response.Values = append(response.Values, jobtypes.ValueRow{
+                       Columns: []string{"0"},
+               })
+       } else {
+               response.Values = append(response.Values, jobtypes.ValueRow{
+                       Columns: []string{strconv.FormatInt(rowsAffected, 10)},
+               })
+       }
+
        return nil
 }
 
+// buildFields populates the response.Fields slice based on aliasFields or 
columns
+// It returns the list of field names that should be used for value mapping.
+func (jc *JDBCCollector) buildFields(response *jobtypes.CollectRepMetricsData, 
aliasFields []string, columns []string) []string {
+       var fieldNames []string
+       if len(aliasFields) > 0 {
+               fieldNames = aliasFields
+       } else {
+               fieldNames = columns
+       }
+
+       response.Fields = make([]jobtypes.Field, len(fieldNames))
+       for i, fieldName := range fieldNames {
+               response.Fields[i] = jobtypes.Field{
+                       Field:    fieldName,
+                       Type:     constants.TYPE_STRING,
+                       Label:    false,
+                       Unit:     "",
+                       Instance: i == 0, // Default first as instance
+               }
+       }
+       return fieldNames
+}
+
+// addResponseTime checks if response_time is requested and adds it to all 
value rows
+func (jc *JDBCCollector) addResponseTime(response 
*jobtypes.CollectRepMetricsData, aliasFields []string, duration time.Duration) {
+       rtIndex := -1
+       for i, field := range aliasFields {
+               if field == constants.RESPONSE_TIME {
+                       rtIndex = i
+                       break
+               }
+       }
+
+       if rtIndex == -1 {
+               return
+       }
+
+       responseTimeMs := strconv.FormatInt(duration.Milliseconds(), 10)
+       for i := range response.Values {
+               if len(response.Values[i].Columns) > rtIndex {
+                       response.Values[i].Columns[rtIndex] = responseTimeMs
+               }
+       }
+}
+
 // createSuccessResponse creates a successful metrics data response
 func (jc *JDBCCollector) createSuccessResponse(metrics *jobtypes.Metrics) 
*jobtypes.CollectRepMetricsData {
        return &jobtypes.CollectRepMetricsData{
@@ -498,7 +984,7 @@ func (jc *JDBCCollector) createSuccessResponse(metrics 
*jobtypes.Metrics) *jobty
                Metrics:   metrics.Name,
                Priority:  0,
                Time:      time.Now().UnixMilli(),
-               Code:      200, // Success
+               Code:      CodeSuccess,
                Msg:       "success",
                Fields:    make([]jobtypes.Field, 0),
                Values:    make([]jobtypes.ValueRow, 0),
diff --git a/internal/collector/basic/ssh/ssh_collector.go 
b/internal/collector/basic/ssh/ssh_collector.go
index 39ef137..79e4370 100644
--- a/internal/collector/basic/ssh/ssh_collector.go
+++ b/internal/collector/basic/ssh/ssh_collector.go
@@ -38,7 +38,7 @@ const (
 
        // Special fields
        ResponseTime = "responseTime"
-       NullValue    = consts.NullValue
+       NullValue    = consts.NULL_VALUE
 
        // Parse types
        ParseTypeOneRow   = "oneRow"   // Each line corresponds to one field 
value, in order of aliasFields
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go 
b/internal/collector/common/collect/dispatch/metrics_collector.go
index 1a36d52..c4aea7e 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -85,7 +85,7 @@ func (mc *MetricsCollector) CalculateFields(metrics 
*jobtypes.Metrics, result *j
                                break
                        }
                        aliasFieldValue := aliasRow.Columns[aliasIndex]
-                       if aliasFieldValue == constants.NullValue {
+                       if aliasFieldValue == constants.NULL_VALUE {
                                fieldValueMap[aliasField] = nil
                                stringTypeFieldValueMap[aliasField] = nil
                                continue
@@ -115,7 +115,7 @@ func (mc *MetricsCollector) CalculateFields(metrics 
*jobtypes.Metrics, result *j
 
                        if program, exists := fieldExpressionMap[realField]; 
exists {
                                var context map[string]interface{}
-                               if field.Type == constants.TypeString {
+                               if field.Type == constants.TYPE_STRING {
                                        context = stringTypeFieldValueMap
                                } else {
                                        context = fieldValueMap
@@ -146,9 +146,9 @@ func (mc *MetricsCollector) CalculateFields(metrics 
*jobtypes.Metrics, result *j
                        }
 
                        // Process based on field type if value exists
-                       if value != "" && value != constants.NullValue {
+                       if value != "" && value != constants.NULL_VALUE {
                                switch field.Type {
-                               case constants.TypeNumber:
+                               case constants.TYPE_NUMBER:
                                        // Extract numeric value and format
                                        doubleAndUnit := 
replacer.ExtractDoubleAndUnitFromStr(value)
                                        if doubleAndUnit != nil {
@@ -180,24 +180,24 @@ func (mc *MetricsCollector) CalculateFields(metrics 
*jobtypes.Metrics, result *j
                                                }
                                                value = 
formatNumber(numericValue)
                                        } else {
-                                               value = constants.NullValue
+                                               value = constants.NULL_VALUE
                                        }
 
-                               case constants.TypeTime:
+                               case constants.TYPE_TIME:
                                        // TODO: Implement time parsing
                                        // For now, keep original value
                                }
                        }
 
                        if value == "" {
-                               value = constants.NullValue
+                               value = constants.NULL_VALUE
                        }
 
                        realValueRow.Columns = append(realValueRow.Columns, 
value)
 
                        // Add the calculated field value to context maps for 
use in subsequent expressions
                        // This allows later expressions to reference earlier 
calculated fields
-                       if value != constants.NullValue {
+                       if value != constants.NULL_VALUE {
                                doubleAndUnit := 
replacer.ExtractDoubleAndUnitFromStr(value)
                                if doubleAndUnit != nil {
                                        fieldValueMap[realField] = 
doubleAndUnit.Value
diff --git 
a/internal/collector/common/collect/dispatch/metrics_collector_test.go 
b/internal/collector/common/collect/dispatch/metrics_collector_test.go
index 07186a3..bae124d 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector_test.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector_test.go
@@ -47,9 +47,9 @@ func TestCalculateFieldsWithUnitConversion(t *testing.T) {
                                Name:     "memory",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "total", Type: 
constants.TypeNumber},
-                                       {Field: "used", Type: 
constants.TypeNumber},
-                                       {Field: "free", Type: 
constants.TypeNumber},
+                                       {Field: "total", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "used", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "free", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"total", "used", "free"},
                                Units:       []string{"total=B->MB", 
"used=B->MB", "free=B->MB"},
@@ -70,8 +70,8 @@ func TestCalculateFieldsWithUnitConversion(t *testing.T) {
                                Name:     "disk",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "size", Type: 
constants.TypeNumber},
-                                       {Field: "usage", Type: 
constants.TypeString},
+                                       {Field: "size", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "usage", Type: 
constants.TYPE_STRING},
                                },
                                AliasFields: []string{"size", "usage"},
                                Units:       []string{"size=KB->GB"},
@@ -92,7 +92,7 @@ func TestCalculateFieldsWithUnitConversion(t *testing.T) {
                                Name:     "memory",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "committed", Type: 
constants.TypeNumber},
+                                       {Field: "committed", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"committed"},
                                Units:       []string{"committed=B->MB"},
@@ -113,8 +113,8 @@ func TestCalculateFieldsWithUnitConversion(t *testing.T) {
                                Name:     "cpu",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "usage", Type: 
constants.TypeNumber},
-                                       {Field: "cores", Type: 
constants.TypeNumber},
+                                       {Field: "usage", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "cores", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"usage", "cores"},
                        },
@@ -181,8 +181,8 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
                                Name:     "memory",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "total", Type: 
constants.TypeNumber},
-                                       {Field: "used", Type: 
constants.TypeNumber},
+                                       {Field: "total", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "used", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"total_bytes", 
"used_bytes"},
                                Calculates:  []string{"total=total_bytes", 
"used=used_bytes"},
@@ -203,9 +203,9 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
                                Name:     "memory",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "total", Type: 
constants.TypeNumber},
-                                       {Field: "used", Type: 
constants.TypeNumber},
-                                       {Field: "usage", Type: 
constants.TypeNumber},
+                                       {Field: "total", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "used", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "usage", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"total", "used"},
                                Calculates:  []string{"usage=(used / total) * 
100"},
@@ -226,10 +226,10 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
                                Name:     "disk",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "total", Type: 
constants.TypeNumber},
-                                       {Field: "used", Type: 
constants.TypeNumber},
-                                       {Field: "free", Type: 
constants.TypeNumber},
-                                       {Field: "usage_percent", Type: 
constants.TypeNumber},
+                                       {Field: "total", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "used", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "free", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "usage_percent", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"total_space", 
"used_space", "free_space", "usage"},
                                Calculates: []string{
@@ -255,9 +255,9 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
                                Name:     "memory",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "total", Type: 
constants.TypeNumber},
-                                       {Field: "used", Type: 
constants.TypeNumber},
-                                       {Field: "free", Type: 
constants.TypeNumber},
+                                       {Field: "total", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "used", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "free", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"total_mb", "used_mb", 
"free_mb"},
                                Calculates: []jobtypes.Calculate{
@@ -282,10 +282,10 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
                                Name:     "jvm",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "heap_max", Type: 
constants.TypeNumber},
-                                       {Field: "heap_used", Type: 
constants.TypeNumber},
-                                       {Field: "heap_free", Type: 
constants.TypeNumber},
-                                       {Field: "heap_usage", Type: 
constants.TypeNumber},
+                                       {Field: "heap_max", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "heap_used", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "heap_free", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "heap_usage", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"max", "used", "free", 
"usage"},
                                Calculates: []jobtypes.Calculate{
@@ -311,8 +311,8 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
                                Name:     "cpu",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "idle", Type: 
constants.TypeNumber},
-                                       {Field: "usage", Type: 
constants.TypeNumber},
+                                       {Field: "idle", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "usage", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"idle_percent", 
"usage_percent"},
                                Calculates:  []interface{}{"idle=idle_percent", 
"usage=usage_percent"},
@@ -333,9 +333,9 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
                                Name:     "network",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "rx_bytes", Type: 
constants.TypeNumber},
-                                       {Field: "tx_bytes", Type: 
constants.TypeNumber},
-                                       {Field: "total_bytes", Type: 
constants.TypeNumber},
+                                       {Field: "rx_bytes", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "tx_bytes", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "total_bytes", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"received", 
"transmitted", "total"},
                                Calculates: []interface{}{
@@ -360,9 +360,9 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
                                Name:     "system",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "load1", Type: 
constants.TypeNumber},
-                                       {Field: "load5", Type: 
constants.TypeNumber},
-                                       {Field: "load15", Type: 
constants.TypeNumber},
+                                       {Field: "load1", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "load5", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "load15", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"load_1min", "load_5min", 
"load_15min"},
                                Calculates: []interface{}{
@@ -387,9 +387,9 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
                                Name:     "database",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "connections", Type: 
constants.TypeNumber},
-                                       {Field: "max_connections", Type: 
constants.TypeNumber},
-                                       {Field: "usage_rate", Type: 
constants.TypeNumber},
+                                       {Field: "connections", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "max_connections", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "usage_rate", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"active", "max"},
                                Calculates: []interface{}{
@@ -414,9 +414,9 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
                                Name:     "storage",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "total", Type: 
constants.TypeNumber},
-                                       {Field: "used", Type: 
constants.TypeNumber},
-                                       {Field: "available", Type: 
constants.TypeNumber},
+                                       {Field: "total", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "used", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "available", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"capacity", "occupied", 
"free_space"},
                                Calculates: []interface{}{
@@ -441,7 +441,7 @@ func TestCalculateFieldsWithCalculates(t *testing.T) {
                                Name:     "simple",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "value", Type: 
constants.TypeNumber},
+                                       {Field: "value", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"value"},
                        },
@@ -614,9 +614,9 @@ func TestCalculateFieldsWithUnitStructFormat(t *testing.T) {
                                Name:     "memory",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "total", Type: 
constants.TypeNumber},
-                                       {Field: "used", Type: 
constants.TypeNumber},
-                                       {Field: "free", Type: 
constants.TypeNumber},
+                                       {Field: "total", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "used", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "free", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"total", "used", "free"},
                                Units: []jobtypes.Unit{
@@ -641,9 +641,9 @@ func TestCalculateFieldsWithUnitStructFormat(t *testing.T) {
                                Name:     "storage",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "disk_total", Type: 
constants.TypeNumber},
-                                       {Field: "disk_used", Type: 
constants.TypeNumber},
-                                       {Field: "disk_free", Type: 
constants.TypeNumber},
+                                       {Field: "disk_total", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "disk_used", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "disk_free", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"disk_total", 
"disk_used", "disk_free"},
                                Units: []interface{}{
@@ -668,9 +668,9 @@ func TestCalculateFieldsWithUnitStructFormat(t *testing.T) {
                                Name:     "jvm",
                                Priority: 0,
                                Fields: []jobtypes.Field{
-                                       {Field: "heap_max", Type: 
constants.TypeNumber},
-                                       {Field: "heap_used", Type: 
constants.TypeNumber},
-                                       {Field: "heap_usage", Type: 
constants.TypeNumber},
+                                       {Field: "heap_max", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "heap_used", Type: 
constants.TYPE_NUMBER},
+                                       {Field: "heap_usage", Type: 
constants.TYPE_NUMBER},
                                },
                                AliasFields: []string{"max_bytes", 
"used_bytes"},
                                Calculates:  []string{"heap_max=max_bytes", 
"heap_used=used_bytes", "heap_usage=(heap_used / heap_max) * 100"},
diff --git a/internal/collector/common/types/job/metrics_types.go 
b/internal/collector/common/types/job/metrics_types.go
index ef88be8..6474be1 100644
--- a/internal/collector/common/types/job/metrics_types.go
+++ b/internal/collector/common/types/job/metrics_types.go
@@ -205,18 +205,18 @@ type SSHProtocol struct {
 
 // JDBCProtocol represents JDBC protocol configuration
 type JDBCProtocol struct {
-       Host            string                 `json:"host"`
-       Port            string                 `json:"port"`
-       Platform        string                 `json:"platform"`
-       Username        string                 `json:"username"`
-       Password        string                 `json:"password"`
-       Database        string                 `json:"database"`
-       Timeout         string                 `json:"timeout"`
-       QueryType       string                 `json:"queryType"`
-       SQL             string                 `json:"sql"`
-       URL             string                 `json:"url"`
-       ReuseConnection string                 `json:"reuseConnection"`
-       SSHTunnel       map[string]interface{} `json:"sshTunnel,omitempty"`
+       Host            string     `json:"host"`
+       Port            string     `json:"port"`
+       Platform        string     `json:"platform"`
+       Username        string     `json:"username"`
+       Password        string     `json:"password"`
+       Database        string     `json:"database"`
+       Timeout         string     `json:"timeout"`
+       QueryType       string     `json:"queryType"`
+       SQL             string     `json:"sql"`
+       URL             string     `json:"url"`
+       ReuseConnection string     `json:"reuseConnection"`
+       SSHTunnel       *SSHTunnel `json:"sshTunnel,omitempty"`
 }
 
 // SNMPProtocol represents SNMP protocol configuration
diff --git a/internal/constants/const.go b/internal/constants/const.go
index 6895fcb..d6ac8de 100644
--- a/internal/constants/const.go
+++ b/internal/constants/const.go
@@ -1,49 +1,78 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
+// Package constants defines public common constants,
+// ported from org.apache.hertzbeat.common.constants.CommonConstants
 package constants
 
-// System constants
+// Collect Response status code (Go Collector specific)
 const (
-       KeyWord   = "keyword"
-       NullValue = "&nbsp;"
+       CollectSuccess       = 0 // Generic success
+       CollectUnavailable   = 1 // Service unavailable (e.g., target service 
error)
+       CollectUnReachable   = 2 // Network unreachable (e.g., network timeout, 
DNS fail)
+       CollectUnConnectable = 3 // Connection failed (e.g., port closed, SSH 
auth fail)
+       CollectFail          = 4 // Generic failure (e.g., query error, script 
exec fail)
+       CollectTimeout       = 5 // Collection timeout
 )
 
-// Field type constants
+// Field parameter types (Aligned with CommonConstants.java)
 const (
-       TypeNumber = 0
-       TypeString = 1
-       TypeTime   = 2
+       TYPE_NUMBER = 0 // Field parameter type: number
+       TYPE_STRING = 1 // Field parameter type: String
+       TYPE_SECRET = 2 // Field parameter type: encrypted string
+       TYPE_TIME   = 3 // Field parameter type: time
 )
 
-// Service related constants
+// Monitoring status (Aligned with CommonConstants.java)
 const (
-       MongoDbAtlasModel = "mongodb-atlas"
+       MONITOR_PAUSED_CODE = 0 // 0: Paused
+       MONITOR_UP_CODE     = 1 // 1: Up
+       MONITOR_DOWN_CODE   = 2 // 2: Down
+)
 
-       PostgreSQLUnReachAbleCode = "08001"
-       ZookeeperApp              = "zookeeper"
-       ZookeeperEnviHeader       = "Environment:"
+// Common metric/label keys
+const (
+       // Collection metric value: null placeholder for empty value
+       NULL_VALUE = "&nbsp;"
+
+       // Common metric keys
+       ErrorMsg      = "errorMsg"
+       RESPONSE_TIME = "responseTime"
+       StatusCode    = "statusCode"
+
+       // Label keys (Aligned with CommonConstants.java)
+       LABEL_INSTANCE       = "instance"
+       LABEL_DEFINE_ID      = "defineid"
+       LABEL_ALERT_NAME     = "alertname"
+       LABEL_INSTANCE_HOST  = "instancehost"
+       LABEL_INSTANCE_NAME  = "instancename"
+       LABEL_ALERT_SEVERITY = "severity"
 )
 
-// API constants
+// Service specific constants
 const (
-       ErrorMsg     = "errorMsg"
-       ResponseTime = "responseTime"
-       StatusCode   = "statusCode"
+       MongoDbAtlasModel = "mongodb-atlas"
+
+       // PostgreSQLUnReachAbleCode Specific SQLState for connection failure
+       PostgreSQLUnReachAbleCode = "08001"
+       // ZookeeperApp App name for Zookeeper
+       ZookeeperApp = "zookeeper"
+       // ZookeeperEnviHeader Header string in Zookeeper 'envi' command output
+       ZookeeperEnviHeader = "Environment:"
 )
 
 // Function related constants
@@ -51,12 +80,11 @@ const (
        CollectorModule = "collector"
 )
 
-// Collect Response status code
+// Legacy or alias constants
+// These are kept for compatibility with previous Go code logic
 const (
-       CollectSuccess       = 0
-       CollectUnavailable   = 1
-       CollectUnReachable   = 2
-       CollectUnConnectable = 3
-       CollectFail          = 4
-       CollectTimeout       = 5
+       // FieldTypeString Alias for TYPE_STRING
+       FieldTypeString = TYPE_STRING
+       // KeyWord Deprecated placeholder
+       KeyWord = "keyword"
 )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to