This is an automated email from the ASF dual-hosted git repository. gaoxingcun pushed a commit to branch feature/go-collector-arrow-communication in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit d8c7becd272bcc855795e07f2d9c01d76e1a221f Author: TJxiaobao <[email protected]> AuthorDate: Tue Oct 14 22:51:47 2025 +0800 feat: implement Apache Arrow data communication with Java Manager 1、Add Apache Arrow serialization for metrics data compatibility 2、Implement complete result handling and message routing 3、Fix MonitorID matching between Go collector and Java Manager 4、Add one-time and cyclic task result aggregation 5、Integrate message router with job scheduler --- Dockerfile | 4 +- internal/cmd/server.go | 19 +- .../collector/basic/database/jdbc_collector.go | 105 +++--- .../common/collect/dispatch/metrics_collector.go | 10 +- .../common/collect/lazy_message_router.go | 417 +++++++++++++++++++++ .../collector/common/collect/result_handler.go | 9 +- .../common/dispatcher/common_dispatcher.go | 67 +++- .../common/dispatcher/hashed_wheel_timer.go | 41 +- internal/collector/common/dispatcher/time_wheel.go | 321 +++++++++++++++- .../common/dispatcher/wheel_timer_task.go | 57 ++- internal/collector/common/router/message_router.go | 269 +++++++++++++ internal/collector/common/transport/transport.go | 28 +- internal/collector/common/types/job/job_types.go | 9 +- .../collector/common/types/job/metrics_types.go | 111 ++++-- internal/transport/netty_client.go | 8 +- internal/transport/processors.go | 265 +++++++++++-- internal/util/arrow/arrow_serializer.go | 151 ++++++++ internal/util/crypto/aes_util.go | 194 ++++++++++ internal/util/param/param_replacer.go | 391 +++++++++++++++++++ 19 files changed, 2303 insertions(+), 173 deletions(-) diff --git a/Dockerfile b/Dockerfile index 74c38a7..eb17107 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -FROM golang:1.23-alpine AS golang-builder +FROM golang:1.25-alpine3.22 AS golang-builder ARG GOPROXY # ENV GOPROXY ${GOPROXY:-direct} @@ -28,7 +28,7 @@ ENV BUILD_DIR /app COPY . ${BUILD_DIR} WORKDIR ${BUILD_DIR} -RUN apk --no-cache add build-base git bash +RUN apk --no-cache add build-base git bash golangci-lint RUN make init && \ make fmt && \ diff --git a/internal/cmd/server.go b/internal/cmd/server.go index 5ff4c45..1e98ceb 100644 --- a/internal/cmd/server.go +++ b/internal/cmd/server.go @@ -108,17 +108,20 @@ func server(ctx context.Context, logOut io.Writer) error { } func startRunners(ctx context.Context, cfg *clrserver.Server) error { + // Create job server first + jobRunner := jobserver.New(&jobserver.Config{ + Server: *cfg, + }) + + // Create transport server and connect it to job server + transportRunner := transportserver.NewFromConfig(cfg.Config) + transportRunner.SetJobScheduler(jobRunner) // Connect transport to job scheduler + runners := []struct { runner Runner[collectortypes.Info] }{ - { - jobserver.New(&jobserver.Config{ - Server: *cfg, - }), - }, - { - transportserver.NewFromConfig(cfg.Config), - }, + {jobRunner}, + {transportRunner}, // todo; add metrics } diff --git a/internal/collector/basic/database/jdbc_collector.go b/internal/collector/basic/database/jdbc_collector.go index 01fa7f2..dd74b2f 100644 --- a/internal/collector/basic/database/jdbc_collector.go +++ b/internal/collector/basic/database/jdbc_collector.go @@ -30,6 +30,7 @@ import ( _ "github.com/microsoft/go-mssqldb" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param" ) const ( @@ -67,6 +68,13 @@ func NewJDBCCollector(logger logger.Logger) *JDBCCollector { } } +// extractJDBCConfig extracts JDBC configuration from interface{} type +// This function uses the parameter replacer for consistent configuration extraction +func extractJDBCConfig(jdbcInterface interface{}) (*jobtypes.JDBCProtocol, error) { + replacer := param.NewReplacer() + return replacer.ExtractJDBCConfig(jdbcInterface) +} + // PreCheck validates the JDBC metrics configuration func (jc *JDBCCollector) PreCheck(metrics *jobtypes.Metrics) error { if metrics == nil { @@ -77,44 +85,51 @@ func (jc *JDBCCollector) PreCheck(metrics *jobtypes.Metrics) error { return fmt.Errorf("JDBC protocol configuration is required") } - jdbc := metrics.JDBC + // Extract JDBC configuration + jdbcConfig, err := extractJDBCConfig(metrics.JDBC) + if err != nil { + return fmt.Errorf("invalid JDBC configuration: %w", err) + } + if jdbcConfig == nil { + return fmt.Errorf("JDBC configuration is required") + } // Validate required fields when URL is not provided - if jdbc.URL == "" { - if jdbc.Host == "" { + if jdbcConfig.URL == "" { + if jdbcConfig.Host == "" { return fmt.Errorf("host is required when URL is not provided") } - if jdbc.Port == "" { + if jdbcConfig.Port == "" { return fmt.Errorf("port is required when URL is not provided") } - if jdbc.Platform == "" { + if jdbcConfig.Platform == "" { return fmt.Errorf("platform is required when URL is not provided") } } // Validate platform - if jdbc.Platform != "" { - switch jdbc.Platform { + if jdbcConfig.Platform != "" { + switch jdbcConfig.Platform { case PlatformMySQL, PlatformMariaDB, PlatformPostgreSQL, PlatformSQLServer, PlatformOracle: // Valid platforms default: - return fmt.Errorf("unsupported database platform: %s", jdbc.Platform) + return fmt.Errorf("unsupported database platform: %s", jdbcConfig.Platform) } } // Validate query type - if jdbc.QueryType != "" { - switch jdbc.QueryType { + if jdbcConfig.QueryType != "" { + switch jdbcConfig.QueryType { case QueryTypeOneRow, QueryTypeMultiRow, QueryTypeColumns, QueryTypeRunScript: // Valid query types default: - return fmt.Errorf("unsupported query type: %s", jdbc.QueryType) + return fmt.Errorf("unsupported query type: %s", jdbcConfig.QueryType) } } // Validate SQL for most query types - if jdbc.QueryType != QueryTypeRunScript && jdbc.SQL == "" { - return fmt.Errorf("SQL is required for query type: %s", jdbc.QueryType) + if jdbcConfig.QueryType != QueryTypeRunScript && jdbcConfig.SQL == "" { + return fmt.Errorf("SQL is required for query type: %s", jdbcConfig.QueryType) } return nil @@ -123,27 +138,35 @@ func (jc *JDBCCollector) PreCheck(metrics *jobtypes.Metrics) error { // Collect performs JDBC metrics collection func (jc *JDBCCollector) Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRepMetricsData { startTime := time.Now() - jdbc := metrics.JDBC - jc.logger.Info("starting JDBC collection", - "host", jdbc.Host, - "port", jdbc.Port, - "platform", jdbc.Platform, - "database", jdbc.Database, - "queryType", jdbc.QueryType) + // Extract JDBC configuration + jdbcConfig, err := extractJDBCConfig(metrics.JDBC) + if err != nil { + jc.logger.Error(err, "failed to extract JDBC config") + return jc.createFailResponse(metrics, CodeFail, fmt.Sprintf("JDBC config error: %v", err)) + } + if jdbcConfig == nil { + return jc.createFailResponse(metrics, CodeFail, "JDBC configuration is required") + } + + // Debug level only for collection start + jc.logger.V(1).Info("starting JDBC collection", + "host", jdbcConfig.Host, + "platform", jdbcConfig.Platform, + "queryType", jdbcConfig.QueryType) // Get timeout - timeout := jc.getTimeout(jdbc.Timeout) + timeout := jc.getTimeout(jdbcConfig.Timeout) // Get database URL - databaseURL, err := jc.constructDatabaseURL(jdbc) + databaseURL, err := jc.constructDatabaseURL(jdbcConfig) if err != nil { jc.logger.Error(err, "failed to construct database URL") return jc.createFailResponse(metrics, CodeFail, fmt.Sprintf("Database URL error: %v", err)) } // Create database connection - db, err := jc.getConnection(databaseURL, jdbc.Username, jdbc.Password, timeout) + db, err := jc.getConnection(databaseURL, jdbcConfig.Username, jdbcConfig.Password, timeout) if err != nil { jc.logger.Error(err, "failed to connect to database") return jc.createFailResponse(metrics, CodeUnConnectable, fmt.Sprintf("Connection error: %v", err)) @@ -153,26 +176,27 @@ func (jc *JDBCCollector) Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRep // Execute query based on type with context response := jc.createSuccessResponse(metrics) - switch jdbc.QueryType { + switch jdbcConfig.QueryType { case QueryTypeOneRow: - err = jc.queryOneRow(db, jdbc.SQL, metrics.Aliasfields, response) + err = jc.queryOneRow(db, jdbcConfig.SQL, metrics.Aliasfields, response) case QueryTypeMultiRow: - err = jc.queryMultiRow(db, jdbc.SQL, metrics.Aliasfields, response) + err = jc.queryMultiRow(db, jdbcConfig.SQL, metrics.Aliasfields, response) case QueryTypeColumns: - err = jc.queryColumns(db, jdbc.SQL, metrics.Aliasfields, response) + err = jc.queryColumns(db, jdbcConfig.SQL, metrics.Aliasfields, response) case QueryTypeRunScript: - err = jc.runScript(db, jdbc.SQL, response) + err = jc.runScript(db, jdbcConfig.SQL, response) default: - err = fmt.Errorf("unsupported query type: %s", jdbc.QueryType) + err = fmt.Errorf("unsupported query type: %s", jdbcConfig.QueryType) } if err != nil { - jc.logger.Error(err, "query execution failed", "queryType", jdbc.QueryType) + jc.logger.Error(err, "query execution failed", "queryType", jdbcConfig.QueryType) return jc.createFailResponse(metrics, CodeFail, fmt.Sprintf("Query error: %v", err)) } duration := time.Since(startTime) - jc.logger.Info("JDBC collection completed", + // Debug level only for successful completion + jc.logger.V(1).Info("JDBC collection completed", "duration", duration, "rowCount", len(response.Values)) @@ -195,9 +219,8 @@ func (jc *JDBCCollector) getTimeout(timeoutStr string) time.Duration { return duration } - // If it's a pure number, treat it as seconds (more reasonable for database connections) if timeout, err := strconv.Atoi(timeoutStr); err == nil { - return time.Duration(timeout) * time.Second + return time.Duration(timeout) * time.Millisecond } return 30 * time.Second // fallback to default @@ -205,10 +228,6 @@ func (jc *JDBCCollector) getTimeout(timeoutStr string) time.Duration { // constructDatabaseURL constructs the database connection URL func (jc *JDBCCollector) constructDatabaseURL(jdbc *jobtypes.JDBCProtocol) (string, error) { - // If URL is provided directly, use it - if jdbc.URL != "" { - return jdbc.URL, nil - } // Construct URL based on platform host := jdbc.Host @@ -217,7 +236,8 @@ func (jc *JDBCCollector) constructDatabaseURL(jdbc *jobtypes.JDBCProtocol) (stri switch jdbc.Platform { case PlatformMySQL, PlatformMariaDB: - return fmt.Sprintf("mysql://%s:%s@tcp(%s:%s)/%s?parseTime=true&charset=utf8mb4", + // MySQL DSN format: [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] + return fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?parseTime=true&charset=utf8mb4", jdbc.Username, jdbc.Password, host, port, database), nil case PlatformPostgreSQL: return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", @@ -234,14 +254,13 @@ func (jc *JDBCCollector) constructDatabaseURL(jdbc *jobtypes.JDBCProtocol) (stri func (jc *JDBCCollector) getConnection(databaseURL, username, password string, timeout time.Duration) (*sql.DB, error) { // Extract driver name from URL var driverName string - if strings.HasPrefix(databaseURL, "mysql://") { - driverName = "mysql" - // Remove mysql:// prefix for the driver - databaseURL = strings.TrimPrefix(databaseURL, "mysql://") - } else if strings.HasPrefix(databaseURL, "postgres://") { + if strings.HasPrefix(databaseURL, "postgres://") { driverName = "postgres" } else if strings.HasPrefix(databaseURL, "sqlserver://") { driverName = "sqlserver" + } else if strings.Contains(databaseURL, "@tcp(") { + // MySQL DSN format (no protocol prefix) + driverName = "mysql" } else { return nil, fmt.Errorf("unsupported database URL format: %s", databaseURL) } diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go b/internal/collector/common/collect/dispatch/metrics_collector.go index 85e7580..bc3ea2e 100644 --- a/internal/collector/common/collect/dispatch/metrics_collector.go +++ b/internal/collector/common/collect/dispatch/metrics_collector.go @@ -53,8 +53,8 @@ func (mc *MetricsCollector) CollectMetrics(metrics *jobtypes.Metrics, job *jobty go func() { defer close(resultChan) - mc.logger.Info("starting metrics collection", - "jobID", job.ID, + // Debug level only for collection start + mc.logger.V(1).Info("starting metrics collection", "metricsName", metrics.Name, "protocol", metrics.Protocol) @@ -72,7 +72,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics *jobtypes.Metrics, job *jobty return } - // Perform the actual collection + // Perform the actual collection with metrics (parameters should already be replaced by CommonDispatcher) result := collector.Collect(metrics) // Enrich result with job information @@ -101,9 +101,9 @@ func (mc *MetricsCollector) CollectMetrics(metrics *jobtypes.Metrics, job *jobty duration := time.Since(startTime) + // Only log failures at INFO level, success at debug level if result != nil && result.Code == http.StatusOK { - mc.logger.Info("metrics collection completed successfully", - "jobID", job.ID, + mc.logger.V(1).Info("metrics collection completed successfully", "metricsName", metrics.Name, "protocol", metrics.Protocol, "duration", duration, diff --git a/internal/collector/common/collect/lazy_message_router.go b/internal/collector/common/collect/lazy_message_router.go new file mode 100644 index 0000000..b12137d --- /dev/null +++ b/internal/collector/common/collect/lazy_message_router.go @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package collect + +import ( + "bytes" + "encoding/binary" + "fmt" + + "github.com/apache/arrow/go/v13/arrow" + "github.com/apache/arrow/go/v13/arrow/array" + "github.com/apache/arrow/go/v13/arrow/ipc" + "github.com/apache/arrow/go/v13/arrow/memory" + + pb "hertzbeat.apache.org/hertzbeat-collector-go/api" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// TransportRunner interface for getting transport client +type TransportRunner interface { + GetClient() transport.TransportClient + IsConnected() bool +} + +// LazyMessageRouter is a message router that lazily obtains transport client +// Solves the startup order dependency between transport client and job runner +type LazyMessageRouter struct { + transportRunner TransportRunner + logger logger.Logger + identity string +} + +// NewLazyMessageRouter creates a new lazy message router +func NewLazyMessageRouter(transportRunner TransportRunner, logger logger.Logger, identity string) MessageRouter { + if identity == "" { + identity = "collector-go" // Default identity + } + + return &LazyMessageRouter{ + transportRunner: transportRunner, + logger: logger.WithName("lazy-message-router"), + identity: identity, + } +} + +// SendResult implements MessageRouter interface +func (l *LazyMessageRouter) SendResult(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job) error { + // Get transport client + client := l.transportRunner.GetClient() + if client == nil || !client.IsStarted() { + l.logger.V(1).Info("transport client not ready, dropping result", + "jobID", job.ID, + "metricsName", data.Metrics, + "isCyclic", job.IsCyclic) + return fmt.Errorf("transport client not ready") + } + + // Send result directly + return l.sendResultDirectly(data, job, client) +} + +// sendResultDirectly sends result directly to manager +func (l *LazyMessageRouter) sendResultDirectly(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job, client transport.TransportClient) error { + // Determine message type + var msgType pb.MessageType + if job.IsCyclic { + msgType = pb.MessageType_RESPONSE_CYCLIC_TASK_DATA + } else { + msgType = pb.MessageType_RESPONSE_ONE_TIME_TASK_DATA + } + + // Serialize data to Arrow format + dataBytes, err := l.serializeToArrow([]*jobtypes.CollectRepMetricsData{data}) + if err != nil { + l.logger.Error(err, "failed to create arrow format", + "jobID", job.ID, + "metricsName", data.Metrics) + return fmt.Errorf("failed to create arrow format: %w", err) + } + + // Create message - collector sends data to Manager as RESPONSE (response to task) + msg := &pb.Message{ + Type: msgType, + Direction: pb.Direction_RESPONSE, + Identity: l.identity, + Msg: dataBytes, + } + + // Send message + if err := client.SendMsg(msg); err != nil { + l.logger.Error(err, "failed to send metrics data", + "jobID", job.ID, + "metricsName", data.Metrics, + "messageType", msgType) + return fmt.Errorf("failed to send metrics data: %w", err) + } + + l.logger.Info("successfully sent metrics data", + "jobID", job.ID, + "metricsName", data.Metrics, + "isCyclic", job.IsCyclic, + "messageType", msgType, + "dataSize", len(dataBytes), + "direction", msg.Direction, + "identity", msg.Identity) + + // Add detailed debugging information + l.logger.Info("message details for debugging", + "msgType", int(msgType), + "direction", int(msg.Direction), + "identity", msg.Identity, + "dataLength", len(dataBytes)) + + return nil +} + +// serializeToArrow serializes data using Apache Arrow format, compatible with Java Manager +func (l *LazyMessageRouter) serializeToArrow(dataList []*jobtypes.CollectRepMetricsData) ([]byte, error) { + var mainBuf bytes.Buffer + + // Write root count (format expected by Java Manager) + rootCount := int32(len(dataList)) + if err := binary.Write(&mainBuf, binary.BigEndian, rootCount); err != nil { + return nil, fmt.Errorf("failed to write root count: %w", err) + } + + mem := memory.NewGoAllocator() + + // Create separate Arrow stream for each data item + for i, data := range dataList { + recordBatch, err := l.createArrowRecordBatch(mem, data) + if err != nil { + return nil, fmt.Errorf("failed to create record batch for data %d: %w", i, err) + } + + // Create Arrow stream + var streamBuf bytes.Buffer + writer := ipc.NewWriter(&streamBuf, ipc.WithSchema(recordBatch.Schema())) + if err := writer.Write(recordBatch); err != nil { + recordBatch.Release() + writer.Close() + return nil, fmt.Errorf("failed to write record batch %d: %w", i, err) + } + if err := writer.Close(); err != nil { + recordBatch.Release() + return nil, fmt.Errorf("failed to close writer for batch %d: %w", i, err) + } + recordBatch.Release() + + // Write stream data to main buffer + streamData := streamBuf.Bytes() + if _, err := mainBuf.Write(streamData); err != nil { + return nil, fmt.Errorf("failed to write stream data for batch %d: %w", i, err) + } + } + + return mainBuf.Bytes(), nil +} + +// createArrowRecordBatch creates Arrow RecordBatch for single MetricsData, compatible with Java Manager +func (l *LazyMessageRouter) createArrowRecordBatch(mem memory.Allocator, data *jobtypes.CollectRepMetricsData) (arrow.Record, error) { + // Create metadata for fields (all fields use the same default metadata) + emptyMetadata := arrow.MetadataFrom(map[string]string{ + "type": "1", + "label": "false", + "unit": "none", + }) + + // Define metadata fields (fixed fields) + metadataFields := []arrow.Field{ + {Name: "app", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: emptyMetadata}, + {Name: "metrics", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: emptyMetadata}, + {Name: "id", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: emptyMetadata}, + {Name: "monitorId", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: emptyMetadata}, + {Name: "tenantId", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: emptyMetadata}, + {Name: "priority", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: emptyMetadata}, + {Name: "time", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: emptyMetadata}, + {Name: "code", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: emptyMetadata}, + {Name: "msg", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: emptyMetadata}, + } + + // Add dynamic fields (based on collected field definitions) + dynamicFields := make([]arrow.Field, 0, len(data.Fields)) + for _, field := range data.Fields { + // Ensure unit is not empty + unitValue := field.Unit + if unitValue == "" { + unitValue = "none" + } + + // Create field metadata + typeValue := fmt.Sprintf("%d", field.Type) + labelValue := fmt.Sprintf("%t", field.Label) + + fieldMetadata := arrow.MetadataFrom(map[string]string{ + "type": typeValue, + "label": labelValue, + "unit": unitValue, + }) + + dynamicFields = append(dynamicFields, arrow.Field{ + Name: field.Field, + Type: arrow.BinaryTypes.String, + Nullable: true, + Metadata: fieldMetadata, + }) + } + + // Merge all fields + allFields := make([]arrow.Field, 0, len(metadataFields)+len(dynamicFields)) + allFields = append(allFields, metadataFields...) + allFields = append(allFields, dynamicFields...) + + // Create schema-level metadata + schemaMetadata := arrow.MetadataFrom(map[string]string{ + "id": fmt.Sprintf("%d", data.ID), + "tenantId": fmt.Sprintf("%d", data.TenantID), + "app": data.App, + "metrics": data.Metrics, + "priority": fmt.Sprintf("%d", data.Priority), + "time": fmt.Sprintf("%d", data.Time), + "labels": "", + "annotations": "", + }) + + schema := arrow.NewSchema(allFields, &schemaMetadata) + + // Create builders (all fields are String type) + builders := make([]array.Builder, len(allFields)) + for i, field := range allFields { + builders[i] = array.NewBuilder(mem, field.Type) + } + defer func() { + for _, builder := range builders { + builder.Release() + } + }() + + // Determine row count + rowCount := len(data.Values) + if rowCount == 0 { + rowCount = 1 // At least one row of metadata + } + + // Fill data + for rowIdx := 0; rowIdx < rowCount; rowIdx++ { + // Fill metadata fields + builders[0].(*array.StringBuilder).Append(data.App) + builders[1].(*array.StringBuilder).Append(data.Metrics) + builders[2].(*array.StringBuilder).Append(fmt.Sprintf("%d", data.ID)) + builders[3].(*array.StringBuilder).Append(fmt.Sprintf("%d", data.MonitorID)) + builders[4].(*array.StringBuilder).Append(fmt.Sprintf("%d", data.TenantID)) + builders[5].(*array.StringBuilder).Append(fmt.Sprintf("%d", data.Priority)) + builders[6].(*array.StringBuilder).Append(fmt.Sprintf("%d", data.Time)) + builders[7].(*array.StringBuilder).Append(fmt.Sprintf("%d", data.Code)) + builders[8].(*array.StringBuilder).Append(data.Msg) + + // Fill dynamic field data + if rowIdx < len(data.Values) { + valueRow := data.Values[rowIdx] + for i, _ := range data.Fields { + builderIdx := len(metadataFields) + i + var value string + if i < len(valueRow.Columns) { + value = valueRow.Columns[i] + } + builders[builderIdx].(*array.StringBuilder).Append(value) + } + } else { + // Fill empty values + for i := range data.Fields { + builderIdx := len(metadataFields) + i + builders[builderIdx].(*array.StringBuilder).Append("") + } + } + } + + // Build arrays + arrays := make([]arrow.Array, len(builders)) + for i, builder := range builders { + arrays[i] = builder.NewArray() + defer arrays[i].Release() + } + + // Create Record + record := array.NewRecord(schema, arrays, int64(rowCount)) + return record, nil +} + +// createUnifiedArrowRecordBatch creates a unified RecordBatch containing all MetricsData +func (l *LazyMessageRouter) createUnifiedArrowRecordBatch(mem memory.Allocator, dataList []*jobtypes.CollectRepMetricsData) (arrow.Record, error) { + if len(dataList) == 0 { + return nil, fmt.Errorf("empty data list") + } + + // Use the first data item to define schema + firstData := dataList[0] + + // Define basic fields - simplified schema, only core fields + fields := []arrow.Field{ + {Name: "id", Type: arrow.PrimitiveTypes.Int64}, + {Name: "monitorId", Type: arrow.PrimitiveTypes.Int64}, + {Name: "app", Type: arrow.BinaryTypes.String}, + {Name: "metrics", Type: arrow.BinaryTypes.String}, + {Name: "time", Type: arrow.PrimitiveTypes.Int64}, + {Name: "code", Type: arrow.PrimitiveTypes.Int32}, + {Name: "msg", Type: arrow.BinaryTypes.String}, + } + + // Add value fields - based on first data item's Fields + if len(firstData.Fields) > 0 { + for _, field := range firstData.Fields { + fields = append(fields, arrow.Field{ + Name: field.Field, + Type: arrow.BinaryTypes.String, + }) + } + } + + schema := arrow.NewSchema(fields, nil) + + // Create builders + builders := make([]array.Builder, len(fields)) + for i, field := range fields { + switch field.Type { + case arrow.PrimitiveTypes.Int64: + builders[i] = array.NewInt64Builder(mem) + case arrow.PrimitiveTypes.Int32: + builders[i] = array.NewInt32Builder(mem) + default: + builders[i] = array.NewStringBuilder(mem) + } + } + + // Calculate total rows + totalRows := 0 + for _, data := range dataList { + if len(data.Values) > 0 { + totalRows += len(data.Values) + } else { + totalRows += 1 // At least one row of metadata + } + } + + // Fill data + for _, data := range dataList { + rowCount := 1 + if len(data.Values) > 0 { + rowCount = len(data.Values) + } + + for row := 0; row < rowCount; row++ { + // Basic fields + builders[0].(*array.Int64Builder).Append(data.ID) + builders[1].(*array.Int64Builder).Append(data.MonitorID) + builders[2].(*array.StringBuilder).Append(data.App) + builders[3].(*array.StringBuilder).Append(data.Metrics) + builders[4].(*array.Int64Builder).Append(data.Time) + builders[5].(*array.Int32Builder).Append(int32(data.Code)) + builders[6].(*array.StringBuilder).Append(data.Msg) + + // Value fields + if len(data.Values) > row && len(data.Fields) > 0 { + valueRow := data.Values[row] + for i := range data.Fields { + builderIndex := 7 + i // Index after basic fields + if builderIndex < len(builders) { + if i < len(valueRow.Columns) { + builders[builderIndex].(*array.StringBuilder).Append(valueRow.Columns[i]) + } else { + builders[builderIndex].(*array.StringBuilder).Append("") + } + } + } + } else { + // Fill empty values + for i := 0; i < len(firstData.Fields); i++ { + builderIndex := 7 + i + if builderIndex < len(builders) { + builders[builderIndex].(*array.StringBuilder).Append("") + } + } + } + } + } + + // Build arrays + arrays := make([]arrow.Array, len(builders)) + for i, builder := range builders { + arrays[i] = builder.NewArray() + defer arrays[i].Release() + builder.Release() + } + + // Create RecordBatch + record := array.NewRecord(schema, arrays, int64(totalRows)) + return record, nil +} diff --git a/internal/collector/common/collect/result_handler.go b/internal/collector/common/collect/result_handler.go index 7f5a304..22ceb1f 100644 --- a/internal/collector/common/collect/result_handler.go +++ b/internal/collector/common/collect/result_handler.go @@ -52,9 +52,8 @@ func (rh *ResultHandlerImpl) HandleCollectData(data *jobtypes.CollectRepMetricsD return fmt.Errorf("collect data is nil") } - rh.logger.Info("handling collect data", - "jobID", job.ID, - "monitorID", job.MonitorID, + // Debug level only for data handling + rh.logger.V(1).Info("handling collect data", "metricsName", data.Metrics, "code", data.Code, "valuesCount", len(data.Values)) @@ -67,9 +66,9 @@ func (rh *ResultHandlerImpl) HandleCollectData(data *jobtypes.CollectRepMetricsD // 4. Triggering alerts based on thresholds // 5. Updating monitoring status + // Only log failures at INFO level, success at debug level if data.Code == http.StatusOK { - rh.logger.Info("successfully processed collect data", - "jobID", job.ID, + rh.logger.V(1).Info("successfully processed collect data", "metricsName", data.Metrics) } else { rh.logger.Info("received failed collect data", diff --git a/internal/collector/common/dispatcher/common_dispatcher.go b/internal/collector/common/dispatcher/common_dispatcher.go index 137304a..8d37be5 100644 --- a/internal/collector/common/dispatcher/common_dispatcher.go +++ b/internal/collector/common/dispatcher/common_dispatcher.go @@ -28,6 +28,7 @@ import ( "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param" ) // CommonDispatcherImpl is responsible for breaking down jobs into individual metrics collection tasks @@ -59,9 +60,9 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx context.Context, job *jo return fmt.Errorf("job cannot be nil") } - cd.logger.Info("dispatching metrics task", + // Dispatching metrics task - only log at debug level + cd.logger.V(1).Info("dispatching metrics task", "jobID", job.ID, - "monitorID", job.MonitorID, "app", job.App, "metricsCount", len(job.Metrics)) @@ -74,6 +75,16 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx context.Context, job *jo return nil } + // Replace parameters in job configuration ONCE before concurrent collection + // This avoids concurrent map access issues in MetricsCollector + paramReplacer := param.NewReplacer() + processedJob, err := paramReplacer.ReplaceJobParams(job) + if err != nil { + cd.logger.Error(err, "failed to replace job parameters", + "jobID", job.ID) + return fmt.Errorf("parameter replacement failed: %w", err) + } + // Create collection context with timeout collectCtx, collectCancel := context.WithTimeout(ctx, cd.getCollectionTimeout(job)) defer collectCancel() @@ -82,13 +93,26 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx context.Context, job *jo resultChannels := make([]chan *jobtypes.CollectRepMetricsData, len(metricsToCollect)) for i, metrics := range metricsToCollect { - cd.logger.Info("starting metrics collection", - "jobID", job.ID, + // Starting metrics collection - debug level only + cd.logger.V(1).Info("starting metrics collection", "metricsName", metrics.Name, "protocol", metrics.Protocol) - // Start metrics collection in goroutine - resultChannels[i] = cd.metricsCollector.CollectMetrics(metrics, job, timeout) + // Find the processed metrics in the processed job + var processedMetrics *jobtypes.Metrics + for j := range processedJob.Metrics { + if processedJob.Metrics[j].Name == metrics.Name { + processedMetrics = &processedJob.Metrics[j] + break + } + } + if processedMetrics == nil { + cd.logger.Error(nil, "processed metrics not found", "metricsName", metrics.Name) + continue + } + + // Start metrics collection in goroutine with processed data + resultChannels[i] = cd.metricsCollector.CollectMetrics(processedMetrics, processedJob, timeout) } // Collect results from all metrics collection tasks @@ -100,8 +124,8 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx context.Context, job *jo case result := <-resultChan: if result != nil { results = append(results, result) - cd.logger.Info("received metrics result", - "jobID", job.ID, + // Metrics result received - debug level only + cd.logger.V(1).Info("received metrics result", "metricsName", metricsToCollect[i].Name, "code", result.Code) } @@ -123,18 +147,26 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx context.Context, job *jo return err } - cd.logger.Info("successfully dispatched metrics task", - "jobID", job.ID, - "resultsCount", len(results), - "errorsCount", len(errors), - "duration", duration) + // Only log summary for successful tasks if there were errors + if len(errors) > 0 { + cd.logger.Info("metrics task completed with errors", + "jobID", job.ID, + "errorsCount", len(errors), + "duration", duration) + } else { + cd.logger.V(1).Info("metrics task completed successfully", + "jobID", job.ID, + "resultsCount", len(results), + "duration", duration) + } return nil } // handleResults processes the collection results and decides on next actions func (cd *CommonDispatcherImpl) handleResults(results []*jobtypes.CollectRepMetricsData, job *jobtypes.Job, errors []error) error { - cd.logger.Info("handling collection results", + // Debug level only for result handling + cd.logger.V(1).Info("handling collection results", "jobID", job.ID, "resultsCount", len(results), "errorsCount", len(errors)) @@ -163,7 +195,8 @@ func (cd *CommonDispatcherImpl) handleResults(results []*jobtypes.CollectRepMetr // evaluateNextActions decides what to do next based on collection results func (cd *CommonDispatcherImpl) evaluateNextActions(job *jobtypes.Job, results []*jobtypes.CollectRepMetricsData, errors []error) { - cd.logger.Info("evaluating next actions", + // Debug level only for next actions evaluation + cd.logger.V(1).Info("evaluating next actions", "jobID", job.ID, "resultsCount", len(results), "errorsCount", len(errors)) @@ -172,13 +205,13 @@ func (cd *CommonDispatcherImpl) evaluateNextActions(job *jobtypes.Job, results [ hasNextLevel := cd.hasNextLevelMetrics(job, results) if hasNextLevel { - cd.logger.Info("job has next level metrics to collect", "jobID", job.ID) + cd.logger.V(1).Info("job has next level metrics to collect", "jobID", job.ID) // TODO: Schedule next level metrics collection } // For cyclic jobs, the rescheduling is handled by WheelTimerTask if job.IsCyclic { - cd.logger.Info("cyclic job will be rescheduled by timer", "jobID", job.ID) + cd.logger.V(1).Info("cyclic job will be rescheduled by timer", "jobID", job.ID) } // TODO: Send results to data queue for further processing diff --git a/internal/collector/common/dispatcher/hashed_wheel_timer.go b/internal/collector/common/dispatcher/hashed_wheel_timer.go index 3ebe931..9100205 100644 --- a/internal/collector/common/dispatcher/hashed_wheel_timer.go +++ b/internal/collector/common/dispatcher/hashed_wheel_timer.go @@ -100,7 +100,8 @@ func (hwt *hashedWheelTimer) NewTimeout(task jobtypes.TimerTask, delay time.Dura bucket.timeouts = append(bucket.timeouts, timeout) bucket.mu.Unlock() - hwt.logger.Info("created timeout", + // Log timeout creation only for debugging + hwt.logger.V(1).Info("created timeout", "delay", delay, "bucketIndex", bucketIndex, "targetTick", targetTick) @@ -114,9 +115,7 @@ func (hwt *hashedWheelTimer) Start(ctx context.Context) error { return nil // already started } - hwt.logger.Info("starting hashed wheel timer", - "wheelSize", hwt.wheelSize, - "tickDuration", hwt.tickDuration) + hwt.logger.Info("starting hashed wheel timer") hwt.startTime = time.Now() hwt.ticker = time.NewTicker(hwt.tickDuration) @@ -138,8 +137,6 @@ func (hwt *hashedWheelTimer) Stop() error { return nil // already stopped } - hwt.logger.Info("stopping hashed wheel timer") - hwt.cancel() if hwt.ticker != nil { @@ -174,6 +171,14 @@ func (hwt *hashedWheelTimer) tick() { bucket := hwt.wheel[bucketIndex] bucket.mu.Lock() + // Only log tick processing if there are timeouts to process + if len(bucket.timeouts) > 0 { + hwt.logger.V(1).Info("processing tick", + "currentTick", currentTick, + "bucketIndex", bucketIndex, + "timeoutsCount", len(bucket.timeouts)) + } + // Process expired timeouts var remaining []*jobtypes.Timeout for _, timeout := range bucket.timeouts { @@ -181,8 +186,18 @@ func (hwt *hashedWheelTimer) tick() { continue // skip cancelled timeouts } - // Check if timeout has expired - if time.Now().After(timeout.Deadline()) { + now := time.Now() + deadline := timeout.Deadline() + + // Check if timeout should execute based on wheel position + // A timeout should execute when currentTick >= its targetTick + wheelIndex := timeout.WheelIndex() + if currentTick >= int64(wheelIndex) { + // Only log actual task execution at INFO level + hwt.logger.Info("executing scheduled task", + "currentTick", currentTick, + "targetTick", wheelIndex) + // Execute the timeout task asynchronously go func(t *jobtypes.Timeout) { defer func() { @@ -196,7 +211,12 @@ func (hwt *hashedWheelTimer) tick() { } }(timeout) } else { - // Not yet expired, keep in bucket + // Not yet time to execute, keep in bucket + hwt.logger.V(1).Info("timeout not ready by wheel position", + "currentTick", currentTick, + "targetTick", wheelIndex, + "deadline", deadline, + "now", now) remaining = append(remaining, timeout) } } @@ -216,10 +236,11 @@ func (hwt *hashedWheelTimer) GetStats() map[string]interface{} { return map[string]interface{}{ "wheelSize": hwt.wheelSize, - "tickDuration": hwt.tickDuration, + "tickDuration": hwt.tickDuration.String(), "currentTick": atomic.LoadInt64(&hwt.currentTick), "totalTimeouts": totalTimeouts, "started": atomic.LoadInt32(&hwt.started) == 1, "stopped": atomic.LoadInt32(&hwt.stopped) == 1, + "wheelPeriod": (time.Duration(hwt.wheelSize) * hwt.tickDuration).String(), } } diff --git a/internal/collector/common/dispatcher/time_wheel.go b/internal/collector/common/dispatcher/time_wheel.go index 30d02ad..d8c7aa1 100644 --- a/internal/collector/common/dispatcher/time_wheel.go +++ b/internal/collector/common/dispatcher/time_wheel.go @@ -29,6 +29,16 @@ import ( "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) +// JobInfo holds information about a scheduled job +type JobInfo struct { + Job *jobtypes.Job + Timeout *jobtypes.Timeout + CreatedAt time.Time + LastExecutedAt time.Time + NextExecutionAt time.Time + ExecutionCount int64 +} + // TimeDispatch manages time-based job scheduling using a hashed wheel timer type TimeDispatch struct { logger logger.Logger @@ -36,8 +46,26 @@ type TimeDispatch struct { commonDispatcher MetricsTaskDispatcher cyclicTasks sync.Map // map[int64]*jobtypes.Timeout for cyclic jobs tempTasks sync.Map // map[int64]*jobtypes.Timeout for one-time jobs + jobInfos sync.Map // map[int64]*JobInfo for detailed job information started bool mu sync.RWMutex + + // Statistics + stats *DispatcherStats + statsLogger *time.Ticker + statsCancel context.CancelFunc +} + +// DispatcherStats holds dispatcher statistics +type DispatcherStats struct { + StartTime time.Time + TotalJobsAdded int64 + TotalJobsRemoved int64 + TotalJobsExecuted int64 + TotalJobsCompleted int64 + TotalJobsFailed int64 + LastExecutionTime time.Time + mu sync.RWMutex } // MetricsTaskDispatcher interface for metrics task dispatching @@ -50,6 +78,7 @@ type HashedWheelTimer interface { NewTimeout(task jobtypes.TimerTask, delay time.Duration) *jobtypes.Timeout Start(ctx context.Context) error Stop() error + GetStats() map[string]interface{} } // NewTimeDispatch creates a new time dispatcher @@ -58,6 +87,9 @@ func NewTimeDispatch(logger logger.Logger, commonDispatcher MetricsTaskDispatche logger: logger.WithName("time-dispatch"), commonDispatcher: commonDispatcher, started: false, + stats: &DispatcherStats{ + StartTime: time.Now(), + }, } // Create hashed wheel timer with reasonable defaults @@ -74,32 +106,56 @@ func (td *TimeDispatch) AddJob(job *jobtypes.Job) error { return fmt.Errorf("job cannot be nil") } - td.logger.Info("adding job to time dispatcher", + // Log job addition at debug level + td.logger.V(1).Info("adding job", "jobID", job.ID, - "isCyclic", job.IsCyclic, "interval", job.DefaultInterval) + // Update statistics + td.stats.mu.Lock() + td.stats.TotalJobsAdded++ + td.stats.mu.Unlock() + // Create wheel timer task - timerTask := NewWheelTimerTask(job, td.commonDispatcher, td.logger) + timerTask := NewWheelTimerTask(job, td.commonDispatcher, td, td.logger) // Calculate delay var delay time.Duration if job.DefaultInterval > 0 { - delay = time.Duration(job.DefaultInterval) * time.Second + delay = time.Duration(job.DefaultInterval) * time.Millisecond + td.logger.V(1).Info("calculated delay", + "interval", job.DefaultInterval, + "delay", delay) } else { - delay = 30 * time.Second // default interval + delay = 10 * time.Second } // Create timeout timeout := td.wheelTimer.NewTimeout(timerTask, delay) - // Store timeout based on job type + // Create job info + now := time.Now() + jobInfo := &JobInfo{ + Job: job, + Timeout: timeout, + CreatedAt: now, + NextExecutionAt: now.Add(delay), + ExecutionCount: 0, + } + + // Store timeout and job info based on job type if job.IsCyclic { td.cyclicTasks.Store(job.ID, timeout) - td.logger.Info("added cyclic job to scheduler", "jobID", job.ID, "delay", delay) + td.jobInfos.Store(job.ID, jobInfo) + td.logger.Info("scheduled cyclic job", + "jobID", job.ID, + "nextExecution", jobInfo.NextExecutionAt) } else { td.tempTasks.Store(job.ID, timeout) - td.logger.Info("added one-time job to scheduler", "jobID", job.ID, "delay", delay) + td.jobInfos.Store(job.ID, jobInfo) + td.logger.Info("scheduled one-time job", + "jobID", job.ID, + "nextExecution", jobInfo.NextExecutionAt) } return nil @@ -107,13 +163,19 @@ func (td *TimeDispatch) AddJob(job *jobtypes.Job) error { // RemoveJob removes a job from the scheduler func (td *TimeDispatch) RemoveJob(jobID int64) error { - td.logger.Info("removing job from time dispatcher", "jobID", jobID) + td.logger.V(1).Info("removing job", "jobID", jobID) + + // Update statistics + td.stats.mu.Lock() + td.stats.TotalJobsRemoved++ + td.stats.mu.Unlock() // Try to remove from cyclic tasks if timeoutInterface, exists := td.cyclicTasks.LoadAndDelete(jobID); exists { if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok { timeout.Cancel() - td.logger.Info("removed cyclic job", "jobID", jobID) + td.jobInfos.Delete(jobID) + td.logger.V(1).Info("removed cyclic job", "jobID", jobID) return nil } } @@ -122,7 +184,8 @@ func (td *TimeDispatch) RemoveJob(jobID int64) error { if timeoutInterface, exists := td.tempTasks.LoadAndDelete(jobID); exists { if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok { timeout.Cancel() - td.logger.Info("removed one-time job", "jobID", jobID) + td.jobInfos.Delete(jobID) + td.logger.V(1).Info("removed one-time job", "jobID", jobID) return nil } } @@ -146,8 +209,11 @@ func (td *TimeDispatch) Start(ctx context.Context) error { return fmt.Errorf("failed to start wheel timer: %w", err) } + // Start periodic statistics logging + td.startStatsLogging(ctx) + td.started = true - td.logger.Info("time dispatcher started successfully") + // Started successfully return nil } @@ -162,6 +228,9 @@ func (td *TimeDispatch) Stop() error { td.logger.Info("stopping time dispatcher") + // Stop statistics logging + td.stopStatsLogging() + // Cancel all running tasks td.cyclicTasks.Range(func(key, value interface{}) bool { if timeout, ok := value.(*jobtypes.Timeout); ok { @@ -180,6 +249,7 @@ func (td *TimeDispatch) Stop() error { // Clear maps td.cyclicTasks = sync.Map{} td.tempTasks = sync.Map{} + td.jobInfos = sync.Map{} // Stop wheel timer if err := td.wheelTimer.Stop(); err != nil { @@ -206,9 +276,230 @@ func (td *TimeDispatch) Stats() map[string]any { return true }) + td.stats.mu.RLock() + uptime := time.Since(td.stats.StartTime) + totalJobsAdded := td.stats.TotalJobsAdded + totalJobsRemoved := td.stats.TotalJobsRemoved + totalJobsExecuted := td.stats.TotalJobsExecuted + totalJobsCompleted := td.stats.TotalJobsCompleted + totalJobsFailed := td.stats.TotalJobsFailed + lastExecutionTime := td.stats.LastExecutionTime + td.stats.mu.RUnlock() + return map[string]any{ - "cyclicJobs": cyclicCount, - "tempJobs": tempCount, - "started": td.started, + "cyclicJobs": cyclicCount, + "tempJobs": tempCount, + "started": td.started, + "uptime": uptime, + "totalJobsAdded": totalJobsAdded, + "totalJobsRemoved": totalJobsRemoved, + "totalJobsExecuted": totalJobsExecuted, + "totalJobsCompleted": totalJobsCompleted, + "totalJobsFailed": totalJobsFailed, + "lastExecutionTime": lastExecutionTime, + } +} + +// RecordJobExecution records job execution statistics +func (td *TimeDispatch) RecordJobExecution() { + td.stats.mu.Lock() + td.stats.TotalJobsExecuted++ + td.stats.LastExecutionTime = time.Now() + td.stats.mu.Unlock() +} + +// RecordJobCompleted records job completion +func (td *TimeDispatch) RecordJobCompleted() { + td.stats.mu.Lock() + td.stats.TotalJobsCompleted++ + td.stats.mu.Unlock() +} + +// RecordJobFailed records job failure +func (td *TimeDispatch) RecordJobFailed() { + td.stats.mu.Lock() + td.stats.TotalJobsFailed++ + td.stats.mu.Unlock() +} + +// UpdateJobExecution updates job execution information +func (td *TimeDispatch) UpdateJobExecution(jobID int64) { + if jobInfoInterface, exists := td.jobInfos.Load(jobID); exists { + if jobInfo, ok := jobInfoInterface.(*JobInfo); ok { + now := time.Now() + jobInfo.LastExecutedAt = now + jobInfo.ExecutionCount++ + + // Update next execution time for cyclic jobs + if jobInfo.Job.IsCyclic && jobInfo.Job.DefaultInterval > 0 { + jobInfo.NextExecutionAt = now.Add(time.Duration(jobInfo.Job.DefaultInterval) * time.Second) + } + } } } + +// RescheduleJob reschedules a cyclic job for the next execution +func (td *TimeDispatch) RescheduleJob(job *jobtypes.Job) error { + if !job.IsCyclic || job.DefaultInterval <= 0 { + return fmt.Errorf("job is not cyclable or has invalid interval") + } + + // Create new timer task for next execution + nextDelay := time.Duration(job.DefaultInterval) * time.Second + timerTask := NewWheelTimerTask(job, td.commonDispatcher, td, td.logger) + + // Schedule the next execution + timeout := td.wheelTimer.NewTimeout(timerTask, nextDelay) + + // Update the timeout in cyclicTasks + td.cyclicTasks.Store(job.ID, timeout) + + // Update job info next execution time + if jobInfoInterface, exists := td.jobInfos.Load(job.ID); exists { + if jobInfo, ok := jobInfoInterface.(*JobInfo); ok { + jobInfo.NextExecutionAt = time.Now().Add(nextDelay) + jobInfo.Timeout = timeout + } + } + + nextExecutionTime := time.Now().Add(nextDelay) + td.logger.Info("rescheduled cyclic job", + "jobID", job.ID, + "interval", job.DefaultInterval, + "nextExecution", nextExecutionTime, + "delay", nextDelay) + + return nil +} + +// startStatsLogging starts periodic statistics logging +func (td *TimeDispatch) startStatsLogging(ctx context.Context) { + // Create a context that can be cancelled + statsCtx, cancel := context.WithCancel(ctx) + td.statsCancel = cancel + + // Create ticker for 1-minute intervals + td.statsLogger = time.NewTicker(1 * time.Minute) + + go func() { + defer td.statsLogger.Stop() + + // Log initial statistics + td.logStatistics() + + for { + select { + case <-td.statsLogger.C: + td.logStatistics() + case <-statsCtx.Done(): + td.logger.Info("statistics logging stopped") + return + } + } + }() + + td.logger.Info("statistics logging started", "interval", "1m") +} + +// stopStatsLogging stops periodic statistics logging +func (td *TimeDispatch) stopStatsLogging() { + if td.statsCancel != nil { + td.statsCancel() + td.statsCancel = nil + } + if td.statsLogger != nil { + td.statsLogger.Stop() + td.statsLogger = nil + } +} + +// logStatistics logs current dispatcher statistics +func (td *TimeDispatch) logStatistics() { + stats := td.Stats() + + // Get detailed job information + var cyclicJobDetails []map[string]any + var tempJobDetails []map[string]any + + td.cyclicTasks.Range(func(key, value interface{}) bool { + if jobID, ok := key.(int64); ok { + if jobInfoInterface, exists := td.jobInfos.Load(jobID); exists { + if jobInfo, ok := jobInfoInterface.(*JobInfo); ok { + now := time.Now() + detail := map[string]any{ + "jobID": jobID, + "app": jobInfo.Job.App, + "monitorID": jobInfo.Job.MonitorID, + "interval": jobInfo.Job.DefaultInterval, + "createdAt": jobInfo.CreatedAt, + "lastExecuted": jobInfo.LastExecutedAt, + "nextExecution": jobInfo.NextExecutionAt, + "execCount": jobInfo.ExecutionCount, + "timeSinceLastExec": func() string { + if jobInfo.LastExecutedAt.IsZero() { + return "never" + } + return now.Sub(jobInfo.LastExecutedAt).String() + }(), + "timeToNextExec": func() string { + if jobInfo.NextExecutionAt.IsZero() { + return "unknown" + } + if jobInfo.NextExecutionAt.Before(now) { + return "overdue" + } + return jobInfo.NextExecutionAt.Sub(now).String() + }(), + } + cyclicJobDetails = append(cyclicJobDetails, detail) + } + } + } + return true + }) + + td.tempTasks.Range(func(key, value interface{}) bool { + if jobID, ok := key.(int64); ok { + if jobInfoInterface, exists := td.jobInfos.Load(jobID); exists { + if jobInfo, ok := jobInfoInterface.(*JobInfo); ok { + detail := map[string]any{ + "jobID": jobID, + "app": jobInfo.Job.App, + "monitorID": jobInfo.Job.MonitorID, + "createdAt": jobInfo.CreatedAt, + "nextExecution": jobInfo.NextExecutionAt, + "execCount": jobInfo.ExecutionCount, + } + tempJobDetails = append(tempJobDetails, detail) + } + } + } + return true + }) + + // Get timer wheel statistics + var timerStats map[string]interface{} + if td.wheelTimer != nil { + timerStats = td.wheelTimer.GetStats() + } + + td.logger.Info("📊 Dispatcher Statistics Report", + "uptime", stats["uptime"], + "activeJobs", map[string]any{ + "cyclic": stats["cyclicJobs"], + "temp": stats["tempJobs"], + "total": stats["cyclicJobs"].(int) + stats["tempJobs"].(int), + }, + "jobCounters", map[string]any{ + "added": stats["totalJobsAdded"], + "removed": stats["totalJobsRemoved"], + "executed": stats["totalJobsExecuted"], + "completed": stats["totalJobsCompleted"], + "failed": stats["totalJobsFailed"], + }, + "timerWheelConfig", timerStats, + "lastExecution", stats["lastExecutionTime"], + "cyclicJobs", cyclicJobDetails, + "tempJobs", tempJobDetails, + ) +} diff --git a/internal/collector/common/dispatcher/wheel_timer_task.go b/internal/collector/common/dispatcher/wheel_timer_task.go index 3eb53e3..657103a 100644 --- a/internal/collector/common/dispatcher/wheel_timer_task.go +++ b/internal/collector/common/dispatcher/wheel_timer_task.go @@ -28,19 +28,30 @@ import ( "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) +// StatsRecorder interface for recording job execution statistics +type StatsRecorder interface { + RecordJobExecution() + RecordJobCompleted() + RecordJobFailed() + UpdateJobExecution(jobID int64) + RescheduleJob(job *jobtypes.Job) error +} + // WheelTimerTask represents a task that runs in the time wheel // This corresponds to Java's WheelTimerTask type WheelTimerTask struct { job *jobtypes.Job commonDispatcher MetricsTaskDispatcher + statsRecorder StatsRecorder logger logger.Logger } // NewWheelTimerTask creates a new wheel timer task -func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher MetricsTaskDispatcher, logger logger.Logger) *WheelTimerTask { +func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher MetricsTaskDispatcher, statsRecorder StatsRecorder, logger logger.Logger) *WheelTimerTask { return &WheelTimerTask{ job: job, commonDispatcher: commonDispatcher, + statsRecorder: statsRecorder, logger: logger.WithName("wheel-timer-task"), } } @@ -53,11 +64,16 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) error { return fmt.Errorf("job is nil, cannot execute") } - wtt.logger.Info("executing wheel timer task", + wtt.logger.V(1).Info("executing task", "jobID", wtt.job.ID, - "monitorID", wtt.job.MonitorID, "app", wtt.job.App) + // Record job execution start + if wtt.statsRecorder != nil { + wtt.statsRecorder.RecordJobExecution() + wtt.statsRecorder.UpdateJobExecution(wtt.job.ID) + } + startTime := time.Now() // Create a context for this specific task execution @@ -74,13 +90,23 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) error { wtt.logger.Error(err, "failed to dispatch metrics task", "jobID", wtt.job.ID, "duration", duration) + + // Record job failure + if wtt.statsRecorder != nil { + wtt.statsRecorder.RecordJobFailed() + } return err } - wtt.logger.Info("successfully dispatched metrics task", + wtt.logger.V(1).Info("task completed", "jobID", wtt.job.ID, "duration", duration) + // Record job completion + if wtt.statsRecorder != nil { + wtt.statsRecorder.RecordJobCompleted() + } + // If this is a cyclic job, reschedule it for the next execution if wtt.job.IsCyclic && wtt.job.DefaultInterval > 0 { wtt.rescheduleJob(timeout) @@ -91,17 +117,20 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) error { // rescheduleJob reschedules a cyclic job for the next execution func (wtt *WheelTimerTask) rescheduleJob(timeout *jobtypes.Timeout) { - wtt.logger.Info("rescheduling cyclic job", - "jobID", wtt.job.ID, - "interval", wtt.job.DefaultInterval) + if wtt.job.DefaultInterval <= 0 { + wtt.logger.Info("job has no valid interval, not rescheduling", + "jobID", wtt.job.ID) + return + } - // TODO: Implement rescheduling logic - // This would involve creating a new timeout with the same job - // and adding it back to the time wheel - // For now, we'll log that rescheduling is needed - wtt.logger.Info("cyclic job needs rescheduling", - "jobID", wtt.job.ID, - "nextExecutionIn", time.Duration(wtt.job.DefaultInterval)*time.Second) + // Use the stats recorder to reschedule the job + if wtt.statsRecorder != nil { + if err := wtt.statsRecorder.RescheduleJob(wtt.job); err != nil { + wtt.logger.Error(err, "failed to reschedule job", "jobID", wtt.job.ID) + } + } else { + wtt.logger.Error(nil, "stats recorder is nil", "jobID", wtt.job.ID) + } } // GetJob returns the associated job diff --git a/internal/collector/common/router/message_router.go b/internal/collector/common/router/message_router.go new file mode 100644 index 0000000..262dff7 --- /dev/null +++ b/internal/collector/common/router/message_router.go @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package router + +import ( + "encoding/json" + "fmt" + + pb "hertzbeat.apache.org/hertzbeat-collector-go/api" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// MessageType constants matching Java version +const ( + MessageTypeHeartbeat int32 = 0 + MessageTypeGoOnline int32 = 1 + MessageTypeGoOffline int32 = 2 + MessageTypeGoClose int32 = 3 + MessageTypeIssueCyclicTask int32 = 4 + MessageTypeDeleteCyclicTask int32 = 5 + MessageTypeIssueOneTimeTask int32 = 6 + MessageTypeResponseCyclicTaskData int32 = 7 + MessageTypeResponseOneTimeTaskData int32 = 8 + MessageTypeResponseCyclicTaskSdData int32 = 9 +) + +// MessageRouter is the interface for routing messages between transport and job components +type MessageRouter interface { + // RegisterProcessors registers all message processors + RegisterProcessors() + + // SendResult sends collection results back to the manager + SendResult(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job) error + + // GetIdentity returns the collector identity + GetIdentity() string +} + +// MessageRouterImpl implements the MessageRouter interface +type MessageRouterImpl struct { + logger logger.Logger + client transport.TransportClient + jobRunner *job.Runner + identity string +} + +// Config represents message router configuration +type Config struct { + Logger logger.Logger + Client transport.TransportClient + JobRunner *job.Runner + Identity string +} + +// New creates a new message router +func New(cfg *Config) MessageRouter { + return &MessageRouterImpl{ + logger: cfg.Logger.WithName("message-router"), + client: cfg.Client, + jobRunner: cfg.JobRunner, + identity: cfg.Identity, + } +} + +// RegisterProcessors registers all message processors +func (r *MessageRouterImpl) RegisterProcessors() { + if r.client == nil { + r.logger.Error(nil, "cannot register processors: client is nil") + return + } + + // Register cyclic task processor + r.client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg interface{}) (interface{}, error) { + if pbMsg, ok := msg.(*pb.Message); ok { + return r.handleIssueCyclicTask(pbMsg) + } + return nil, fmt.Errorf("invalid message type") + }) + + // Register delete cyclic task processor + r.client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg interface{}) (interface{}, error) { + if pbMsg, ok := msg.(*pb.Message); ok { + return r.handleDeleteCyclicTask(pbMsg) + } + return nil, fmt.Errorf("invalid message type") + }) + + // Register one-time task processor + r.client.RegisterProcessor(MessageTypeIssueOneTimeTask, func(msg interface{}) (interface{}, error) { + if pbMsg, ok := msg.(*pb.Message); ok { + return r.handleIssueOneTimeTask(pbMsg) + } + return nil, fmt.Errorf("invalid message type") + }) + + // Other processors can be registered here + r.logger.Info("Successfully registered all message processors") +} + +// handleIssueCyclicTask handles cyclic task messages +func (r *MessageRouterImpl) handleIssueCyclicTask(msg *pb.Message) (*pb.Message, error) { + r.logger.Info("Handling cyclic task message") + + // Parse job from message + var job jobtypes.Job + if err := json.Unmarshal(msg.Msg, &job); err != nil { + r.logger.Error(err, "failed to unmarshal job") + return &pb.Message{ + Type: pb.MessageType_ISSUE_CYCLIC_TASK, + Direction: pb.Direction_RESPONSE, + Identity: r.identity, + Msg: []byte("failed to unmarshal job"), + }, nil + } + + // Add job to job runner + if err := r.jobRunner.AddAsyncCollectJob(&job); err != nil { + r.logger.Error(err, "failed to add job to job runner", "jobID", job.ID) + return &pb.Message{ + Type: pb.MessageType_ISSUE_CYCLIC_TASK, + Direction: pb.Direction_RESPONSE, + Identity: r.identity, + Msg: []byte(fmt.Sprintf("failed to add job: %v", err)), + }, nil + } + + return &pb.Message{ + Type: pb.MessageType_ISSUE_CYCLIC_TASK, + Direction: pb.Direction_RESPONSE, + Identity: r.identity, + Msg: []byte("job added successfully"), + }, nil +} + +// handleDeleteCyclicTask handles delete cyclic task messages +func (r *MessageRouterImpl) handleDeleteCyclicTask(msg *pb.Message) (*pb.Message, error) { + r.logger.Info("Handling delete cyclic task message") + + // Parse job IDs from message + var jobIDs []int64 + if err := json.Unmarshal(msg.Msg, &jobIDs); err != nil { + r.logger.Error(err, "failed to unmarshal job IDs") + return &pb.Message{ + Type: pb.MessageType_DELETE_CYCLIC_TASK, + Direction: pb.Direction_RESPONSE, + Identity: r.identity, + Msg: []byte("failed to unmarshal job IDs"), + }, nil + } + + // Remove jobs from job runner + for _, jobID := range jobIDs { + if err := r.jobRunner.RemoveAsyncCollectJob(jobID); err != nil { + r.logger.Error(err, "failed to remove job from job runner", "jobID", jobID) + // Continue with other jobs even if one fails + } + } + + return &pb.Message{ + Type: pb.MessageType_DELETE_CYCLIC_TASK, + Direction: pb.Direction_RESPONSE, + Identity: r.identity, + Msg: []byte("jobs removed successfully"), + }, nil +} + +// handleIssueOneTimeTask handles one-time task messages +func (r *MessageRouterImpl) handleIssueOneTimeTask(msg *pb.Message) (*pb.Message, error) { + r.logger.Info("Handling one-time task message") + + // Parse job from message + var job jobtypes.Job + if err := json.Unmarshal(msg.Msg, &job); err != nil { + r.logger.Error(err, "failed to unmarshal job") + return &pb.Message{ + Type: pb.MessageType_ISSUE_ONE_TIME_TASK, + Direction: pb.Direction_RESPONSE, + Identity: r.identity, + Msg: []byte("failed to unmarshal job"), + }, nil + } + + // Set job as non-cyclic + job.IsCyclic = false + + // Add job to job runner + if err := r.jobRunner.AddAsyncCollectJob(&job); err != nil { + r.logger.Error(err, "failed to add one-time job to job runner", "jobID", job.ID) + return &pb.Message{ + Type: pb.MessageType_ISSUE_ONE_TIME_TASK, + Direction: pb.Direction_RESPONSE, + Identity: r.identity, + Msg: []byte(fmt.Sprintf("failed to add one-time job: %v", err)), + }, nil + } + + return &pb.Message{ + Type: pb.MessageType_ISSUE_ONE_TIME_TASK, + Direction: pb.Direction_RESPONSE, + Identity: r.identity, + Msg: []byte("one-time job added successfully"), + }, nil +} + +// SendResult sends collection results back to the manager +func (r *MessageRouterImpl) SendResult(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job) error { + if r.client == nil || !r.client.IsStarted() { + return fmt.Errorf("transport client not started") + } + + // Determine message type based on job type + var msgType pb.MessageType + if job.IsCyclic { + msgType = pb.MessageType_RESPONSE_CYCLIC_TASK_DATA + } else { + msgType = pb.MessageType_RESPONSE_ONE_TIME_TASK_DATA + } + + // Serialize metrics data + dataBytes, err := json.Marshal([]jobtypes.CollectRepMetricsData{*data}) + if err != nil { + return fmt.Errorf("failed to marshal metrics data: %w", err) + } + + // Create message + msg := &pb.Message{ + Type: msgType, + Direction: pb.Direction_REQUEST, + Identity: r.identity, + Msg: dataBytes, + } + + // Send message + if err := r.client.SendMsg(msg); err != nil { + return fmt.Errorf("failed to send metrics data: %w", err) + } + + r.logger.Info("Successfully sent metrics data", + "jobID", job.ID, + "metricsName", data.Metrics, + "isCyclic", job.IsCyclic) + + return nil +} + +// GetIdentity returns the collector identity +func (r *MessageRouterImpl) GetIdentity() string { + return r.identity +} diff --git a/internal/collector/common/transport/transport.go b/internal/collector/common/transport/transport.go index ee1da89..44058e3 100644 --- a/internal/collector/common/transport/transport.go +++ b/internal/collector/common/transport/transport.go @@ -45,8 +45,9 @@ const ( ) type Runner struct { - Config *configtypes.CollectorConfig - client transport.TransportClient + Config *configtypes.CollectorConfig + client transport.TransportClient + jobScheduler transport.JobScheduler clrserver.Server } @@ -59,6 +60,11 @@ func New(cfg *configtypes.CollectorConfig) *Runner { } } +// SetJobScheduler sets the job scheduler for the transport runner +func (r *Runner) SetJobScheduler(scheduler transport.JobScheduler) { + r.jobScheduler = scheduler +} + // NewFromConfig creates a new transport runner from collector configuration func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner { if cfg == nil { @@ -150,7 +156,14 @@ func (r *Runner) Start(ctx context.Context) error { r.Logger.Error(event.Error, "Failed to connect to manager gRPC server", "addr", event.Address) } }) - transport.RegisterDefaultProcessors(c) + // Register processors with job scheduler + if r.jobScheduler != nil { + transport.RegisterDefaultProcessors(c, r.jobScheduler) + r.Logger.Info("Registered gRPC processors with job scheduler") + } else { + transport.RegisterDefaultProcessors(c, nil) + r.Logger.Info("Registered gRPC processors without job scheduler") + } case *transport.NettyClient: c.SetEventHandler(func(event transport.Event) { switch event.Type { @@ -163,7 +176,14 @@ func (r *Runner) Start(ctx context.Context) error { r.Logger.Error(event.Error, "Failed to connect to manager netty server", "addr", event.Address) } }) - transport.RegisterDefaultNettyProcessors(c) + // Register processors with job scheduler + if r.jobScheduler != nil { + transport.RegisterDefaultNettyProcessors(c, r.jobScheduler) + r.Logger.Info("Registered netty processors with job scheduler") + } else { + transport.RegisterDefaultNettyProcessors(c, nil) + r.Logger.Info("Registered netty processors without job scheduler") + } } if err := r.client.Start(); err != nil { diff --git a/internal/collector/common/types/job/job_types.go b/internal/collector/common/types/job/job_types.go index 28478b8..70c3c10 100644 --- a/internal/collector/common/types/job/job_types.go +++ b/internal/collector/common/types/job/job_types.go @@ -30,9 +30,9 @@ type Job struct { Hide bool `json:"hide"` Category string `json:"category"` App string `json:"app"` - Name map[string]string `json:"name"` - Help map[string]string `json:"help"` - HelpLink map[string]string `json:"helpLink"` + Name interface{} `json:"name"` // Can be string or map[string]string for i18n + Help interface{} `json:"help"` // Can be string or map[string]string for i18n + HelpLink interface{} `json:"helpLink"` // Can be string or map[string]string for i18n Timestamp int64 `json:"timestamp"` DefaultInterval int64 `json:"defaultInterval"` Intervals []int64 `json:"intervals"` @@ -41,7 +41,10 @@ type Job struct { Metrics []Metrics `json:"metrics"` Configmap []Configmap `json:"configmap"` IsSd bool `json:"isSd"` + Sd bool `json:"sd"` PrometheusProxyMode bool `json:"prometheusProxyMode"` + Cyclic bool `json:"cyclic"` + Interval int64 `json:"interval"` // Internal fields EnvConfigmaps map[string]Configmap `json:"-"` diff --git a/internal/collector/common/types/job/metrics_types.go b/internal/collector/common/types/job/metrics_types.go index 6127238..04f035e 100644 --- a/internal/collector/common/types/job/metrics_types.go +++ b/internal/collector/common/types/job/metrics_types.go @@ -17,29 +17,37 @@ package job -import "time" +import ( + "fmt" + "time" +) // Metrics represents a metric configuration type Metrics struct { Name string `json:"name"` + I18n interface{} `json:"i18n,omitempty"` // Internationalization info Priority int `json:"priority"` + CollectTime int64 `json:"collectTime"` + Interval int64 `json:"interval"` + Visible *bool `json:"visible"` Fields []Field `json:"fields"` Aliasfields []string `json:"aliasfields"` - Calculates []Calculate `json:"calculates"` - Units []Unit `json:"units"` + AliasFields []string `json:"aliasFields"` // Alternative field name + Calculates interface{} `json:"calculates"` // Can be []Calculate or []string + Filters interface{} `json:"filters"` + Units interface{} `json:"units"` // Can be []Unit or []string Protocol string `json:"protocol"` Host string `json:"host"` Port string `json:"port"` Timeout string `json:"timeout"` - Interval int64 `json:"interval"` Range string `json:"range"` - Visible *bool `json:"visible"` ConfigMap map[string]string `json:"configMap"` + HasSubTask bool `json:"hasSubTask"` // Protocol specific fields HTTP *HTTPProtocol `json:"http,omitempty"` SSH *SSHProtocol `json:"ssh,omitempty"` - JDBC *JDBCProtocol `json:"jdbc,omitempty"` + JDBC interface{} `json:"jdbc,omitempty"` // Can be JDBCProtocol or map[string]interface{} SNMP *SNMPProtocol `json:"snmp,omitempty"` JMX *JMXProtocol `json:"jmx,omitempty"` Redis *RedisProtocol `json:"redis,omitempty"` @@ -54,6 +62,7 @@ type Field struct { Unit string `json:"unit"` Instance bool `json:"instance"` Value interface{} `json:"value"` + I18n interface{} `json:"i18n,omitempty"` // Internationalization info } // Calculate represents a calculation configuration @@ -71,8 +80,10 @@ type Unit struct { // ParamDefine represents a parameter definition type ParamDefine struct { + ID interface{} `json:"id"` + App interface{} `json:"app"` Field string `json:"field"` - Name string `json:"name"` + Name interface{} `json:"name"` // Can be string or map[string]string for i18n Type string `json:"type"` Required bool `json:"required"` DefaultValue interface{} `json:"defaultValue"` @@ -80,8 +91,38 @@ type ParamDefine struct { Range string `json:"range"` Limit int `json:"limit"` Options []Option `json:"options"` - Depend *Depend `json:"depend"` + KeyAlias interface{} `json:"keyAlias"` + ValueAlias interface{} `json:"valueAlias"` Hide bool `json:"hide"` + Creator interface{} `json:"creator"` + Modifier interface{} `json:"modifier"` + GmtCreate interface{} `json:"gmtCreate"` + GmtUpdate interface{} `json:"gmtUpdate"` + Depend *Depend `json:"depend"` +} + +// GetName returns the name as string, handling both string and i18n map cases +func (p *ParamDefine) GetName() string { + switch v := p.Name.(type) { + case string: + return v + case map[string]interface{}: + // Try to get English name first, then any available language + if name, ok := v["en"]; ok { + if s, ok := name.(string); ok { + return s + } + } + // Fall back to first available name + for _, val := range v { + if s, ok := val.(string); ok { + return s + } + } + return "unknown" + default: + return fmt.Sprintf("%v", v) + } } // Option represents a parameter option @@ -156,18 +197,18 @@ type SSHProtocol struct { // JDBCProtocol represents JDBC protocol configuration type JDBCProtocol struct { - Host string `json:"host"` - Port string `json:"port"` - Platform string `json:"platform"` - Username string `json:"username"` - Password string `json:"password"` - Database string `json:"database"` - Timeout string `json:"timeout"` - QueryType string `json:"queryType"` - SQL string `json:"sql"` - URL string `json:"url"` - ReuseConnection string `json:"reuseConnection"` - SSHTunnel *SSHTunnel `json:"sshTunnel,omitempty"` + Host string `json:"host"` + Port string `json:"port"` + Platform string `json:"platform"` + Username string `json:"username"` + Password string `json:"password"` + Database string `json:"database"` + Timeout string `json:"timeout"` + QueryType string `json:"queryType"` + SQL string `json:"sql"` + URL string `json:"url"` + ReuseConnection string `json:"reuseConnection"` + SSHTunnel map[string]interface{} `json:"sshTunnel,omitempty"` } // SNMPProtocol represents SNMP protocol configuration @@ -270,7 +311,35 @@ func (j *Job) Clone() *Job { if j.Metrics != nil { clone.Metrics = make([]Metrics, len(j.Metrics)) - copy(clone.Metrics, j.Metrics) + for i, metric := range j.Metrics { + clone.Metrics[i] = metric + + // Deep copy ConfigMap for each metric to avoid concurrent access + if metric.ConfigMap != nil { + clone.Metrics[i].ConfigMap = make(map[string]string, len(metric.ConfigMap)) + for k, v := range metric.ConfigMap { + clone.Metrics[i].ConfigMap[k] = v + } + } + + // Deep copy Fields slice + if metric.Fields != nil { + clone.Metrics[i].Fields = make([]Field, len(metric.Fields)) + copy(clone.Metrics[i].Fields, metric.Fields) + } + + // Deep copy Aliasfields slice + if metric.Aliasfields != nil { + clone.Metrics[i].Aliasfields = make([]string, len(metric.Aliasfields)) + copy(clone.Metrics[i].Aliasfields, metric.Aliasfields) + } + + // Deep copy AliasFields slice + if metric.AliasFields != nil { + clone.Metrics[i].AliasFields = make([]string, len(metric.AliasFields)) + copy(clone.Metrics[i].AliasFields, metric.AliasFields) + } + } } if j.Configmap != nil { diff --git a/internal/transport/netty_client.go b/internal/transport/netty_client.go index d598149..ebaeade 100644 --- a/internal/transport/netty_client.go +++ b/internal/transport/netty_client.go @@ -503,7 +503,7 @@ func (f *TransportClientFactory) CreateClient(protocol, addr string) (TransportC } // RegisterDefaultProcessors registers all default message processors for Netty client -func RegisterDefaultNettyProcessors(client *NettyClient) { +func RegisterDefaultNettyProcessors(client *NettyClient, scheduler JobScheduler) { client.RegisterProcessor(MessageTypeHeartbeat, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { processor := &HeartbeatProcessor{} @@ -538,7 +538,7 @@ func RegisterDefaultNettyProcessors(client *NettyClient) { client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { - processor := &CollectCyclicDataProcessor{client: nil} + processor := &CollectCyclicDataProcessor{client: nil, scheduler: scheduler} return processor.Process(pbMsg) } return nil, fmt.Errorf("invalid message type") @@ -546,7 +546,7 @@ func RegisterDefaultNettyProcessors(client *NettyClient) { client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { - processor := &DeleteCyclicTaskProcessor{client: nil} + processor := &DeleteCyclicTaskProcessor{client: nil, scheduler: scheduler} return processor.Process(pbMsg) } return nil, fmt.Errorf("invalid message type") @@ -554,7 +554,7 @@ func RegisterDefaultNettyProcessors(client *NettyClient) { client.RegisterProcessor(MessageTypeIssueOneTimeTask, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { - processor := &CollectOneTimeDataProcessor{client: nil} + processor := &CollectOneTimeDataProcessor{client: nil, scheduler: scheduler} return processor.Process(pbMsg) } return nil, fmt.Errorf("invalid message type") diff --git a/internal/transport/processors.go b/internal/transport/processors.go index 8bd71e3..b9df9a3 100644 --- a/internal/transport/processors.go +++ b/internal/transport/processors.go @@ -18,11 +18,18 @@ package transport import ( + "encoding/json" "fmt" "log" + "os" "time" pb "hertzbeat.apache.org/hertzbeat-collector-go/api" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param" ) // Message type constants matching Java version @@ -44,6 +51,23 @@ type MessageProcessor interface { Process(msg *pb.Message) (*pb.Message, error) } +// JobScheduler defines the interface for job scheduling operations +// This interface allows transport layer to interact with job scheduling without direct dependencies +type JobScheduler interface { + // AddAsyncCollectJob adds a job to async collection scheduling + AddAsyncCollectJob(job *jobtypes.Job) error + + // RemoveAsyncCollectJob removes a job from scheduling + RemoveAsyncCollectJob(jobID int64) error +} + +// preprocessJobPasswords is a helper function to decrypt passwords in job configmap +// This should be called once when receiving a job to avoid repeated decryption +func preprocessJobPasswords(job *jobtypes.Job) error { + replacer := param.NewReplacer() + return replacer.PreprocessJobPasswords(job) +} + // HeartbeatProcessor handles heartbeat messages type HeartbeatProcessor struct{} @@ -63,7 +87,31 @@ func NewGoOnlineProcessor(client *GrpcClient) *GoOnlineProcessor { } func (p *GoOnlineProcessor) Process(msg *pb.Message) (*pb.Message, error) { - // Handle go online message + log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelInfo).WithName("go-online-processor") + + // Handle go online message - parse ServerInfo and extract AES secret + log.Info("received GO_ONLINE message from manager") + + if len(msg.Msg) == 0 { + log.Info("empty message from server, please upgrade server") + } else { + // Parse ServerInfo from message (matches Java: JsonUtil.fromJson(message.getMsg().toStringUtf8(), ServerInfo.class)) + var serverInfo struct { + AesSecret string `json:"aesSecret"` + } + + if err := json.Unmarshal(msg.Msg, &serverInfo); err != nil { + log.Error(err, "failed to parse ServerInfo from manager") + } else if serverInfo.AesSecret == "" { + log.Info("server response has empty secret, please check configuration") + } else { + // Set the AES secret key globally (matches Java: AesUtil.setDefaultSecretKey(serverInfo.getAesSecret())) + log.Info("received AES secret from manager", "keyLength", len(serverInfo.AesSecret)) + crypto.SetDefaultSecretKey(serverInfo.AesSecret) + } + } + + log.Info("online message processed successfully") return &pb.Message{ Type: pb.MessageType_GO_ONLINE, Direction: pb.Direction_RESPONSE, @@ -128,65 +176,238 @@ func (p *GoCloseProcessor) Process(msg *pb.Message) (*pb.Message, error) { // CollectCyclicDataProcessor handles cyclic task messages type CollectCyclicDataProcessor struct { - client *GrpcClient + client *GrpcClient + scheduler JobScheduler } -func NewCollectCyclicDataProcessor(client *GrpcClient) *CollectCyclicDataProcessor { - return &CollectCyclicDataProcessor{client: client} +func NewCollectCyclicDataProcessor(client *GrpcClient, scheduler JobScheduler) *CollectCyclicDataProcessor { + return &CollectCyclicDataProcessor{ + client: client, + scheduler: scheduler, + } } func (p *CollectCyclicDataProcessor) Process(msg *pb.Message) (*pb.Message, error) { - // Handle cyclic task message - // TODO: Implement actual task processing logic + log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelInfo).WithName("cyclic-task-processor") + log.Info("processing cyclic task message", "identity", msg.Identity) + + if p.scheduler == nil { + log.Info("no job scheduler available, cannot process cyclic task") + return &pb.Message{ + Type: pb.MessageType_ISSUE_CYCLIC_TASK, + Direction: pb.Direction_RESPONSE, + Identity: msg.Identity, + Msg: []byte("no job scheduler available"), + }, nil + } + + // Parse job from message + log.Info("Received cyclic task JSON: %s", string(msg.Msg)) + + var job jobtypes.Job + if err := json.Unmarshal(msg.Msg, &job); err != nil { + log.Error(err, "Failed to unmarshal job from cyclic task message: %v") + return &pb.Message{ + Type: pb.MessageType_ISSUE_CYCLIC_TASK, + Direction: pb.Direction_RESPONSE, + Identity: msg.Identity, + Msg: []byte(fmt.Sprintf("failed to parse job: %v", err)), + }, nil + } + + // Preprocess passwords once (decrypt encrypted passwords in configmap) + // This avoids repeated decryption during each task execution + if err := preprocessJobPasswords(&job); err != nil { + log.Error(err, "Failed to preprocess job passwords") + } + + // Ensure job is marked as cyclic + job.IsCyclic = true + + log.Info("Adding cyclic job to scheduler", + "jobID", job.ID, + "monitorID", job.MonitorID, + "app", job.App) + + // Add job to scheduler + if err := p.scheduler.AddAsyncCollectJob(&job); err != nil { + log.Error(err, "Failed to add cyclic job to scheduler: %v") + return &pb.Message{ + Type: pb.MessageType_ISSUE_CYCLIC_TASK, + Direction: pb.Direction_RESPONSE, + Identity: msg.Identity, + Msg: []byte(fmt.Sprintf("failed to schedule job: %v", err)), + }, nil + } + return &pb.Message{ Type: pb.MessageType_ISSUE_CYCLIC_TASK, Direction: pb.Direction_RESPONSE, Identity: msg.Identity, - Msg: []byte("cyclic task ack"), + Msg: []byte("cyclic task scheduled successfully"), }, nil } // DeleteCyclicTaskProcessor handles delete cyclic task messages type DeleteCyclicTaskProcessor struct { - client *GrpcClient + client *GrpcClient + scheduler JobScheduler } -func NewDeleteCyclicTaskProcessor(client *GrpcClient) *DeleteCyclicTaskProcessor { - return &DeleteCyclicTaskProcessor{client: client} +func NewDeleteCyclicTaskProcessor(client *GrpcClient, scheduler JobScheduler) *DeleteCyclicTaskProcessor { + return &DeleteCyclicTaskProcessor{ + client: client, + scheduler: scheduler, + } } func (p *DeleteCyclicTaskProcessor) Process(msg *pb.Message) (*pb.Message, error) { - // Handle delete cyclic task message + log.Printf("Processing delete cyclic task message, identity: %s", msg.Identity) + + if p.scheduler == nil { + log.Printf("No job scheduler available, cannot process delete cyclic task") + return &pb.Message{ + Type: pb.MessageType_DELETE_CYCLIC_TASK, + Direction: pb.Direction_RESPONSE, + Identity: msg.Identity, + Msg: []byte("no job scheduler available"), + }, nil + } + + // Parse job IDs from message - could be a single job ID or array of IDs + var jobIDs []int64 + + // Try to parse as array first + if err := json.Unmarshal(msg.Msg, &jobIDs); err != nil { + // If array parsing fails, try single job ID + var singleJobID int64 + if err2 := json.Unmarshal(msg.Msg, &singleJobID); err2 != nil { + log.Printf("Failed to unmarshal job IDs from delete message: %v, %v", err, err2) + return &pb.Message{ + Type: pb.MessageType_DELETE_CYCLIC_TASK, + Direction: pb.Direction_RESPONSE, + Identity: msg.Identity, + Msg: []byte(fmt.Sprintf("failed to parse job IDs: %v", err)), + }, nil + } + jobIDs = []int64{singleJobID} + } + + log.Printf("Removing %d cyclic jobs from scheduler: %v", len(jobIDs), jobIDs) + + // Remove jobs from scheduler + var errors []string + for _, jobID := range jobIDs { + if err := p.scheduler.RemoveAsyncCollectJob(jobID); err != nil { + log.Printf("Failed to remove job %d from scheduler: %v", jobID, err) + errors = append(errors, fmt.Sprintf("job %d: %v", jobID, err)) + } else { + log.Printf("Successfully removed job %d from scheduler", jobID) + } + } + + // Prepare response message + var responseMsg string + if len(errors) > 0 { + responseMsg = fmt.Sprintf("partially completed, errors: %v", errors) + } else { + responseMsg = "all cyclic tasks removed successfully" + } + return &pb.Message{ Type: pb.MessageType_DELETE_CYCLIC_TASK, Direction: pb.Direction_RESPONSE, Identity: msg.Identity, - Msg: []byte("delete cyclic task ack"), + Msg: []byte(responseMsg), }, nil } // CollectOneTimeDataProcessor handles one-time task messages type CollectOneTimeDataProcessor struct { - client *GrpcClient + client *GrpcClient + scheduler JobScheduler } -func NewCollectOneTimeDataProcessor(client *GrpcClient) *CollectOneTimeDataProcessor { - return &CollectOneTimeDataProcessor{client: client} +func NewCollectOneTimeDataProcessor(client *GrpcClient, scheduler JobScheduler) *CollectOneTimeDataProcessor { + return &CollectOneTimeDataProcessor{ + client: client, + scheduler: scheduler, + } } func (p *CollectOneTimeDataProcessor) Process(msg *pb.Message) (*pb.Message, error) { - // Handle one-time task message - // TODO: Implement actual task processing logic + log.Printf("Processing one-time task message, identity: %s", msg.Identity) + + if p.scheduler == nil { + log.Printf("No job scheduler available, cannot process one-time task") + return &pb.Message{ + Type: pb.MessageType_ISSUE_ONE_TIME_TASK, + Direction: pb.Direction_RESPONSE, + Identity: msg.Identity, + Msg: []byte("no job scheduler available"), + }, nil + } + + // Parse job from message + log.Printf("Received one-time task JSON: %s", string(msg.Msg)) + + // Parse JSON to check interval values before unmarshaling + var rawJob map[string]interface{} + if err := json.Unmarshal(msg.Msg, &rawJob); err == nil { + if defaultInterval, ok := rawJob["defaultInterval"]; ok { + log.Printf("DEBUG: Raw defaultInterval from JSON: %v (type: %T)", defaultInterval, defaultInterval) + } + if interval, ok := rawJob["interval"]; ok { + log.Printf("DEBUG: Raw interval from JSON: %v (type: %T)", interval, interval) + } + } + + var job jobtypes.Job + if err := json.Unmarshal(msg.Msg, &job); err != nil { + log.Printf("Failed to unmarshal job from one-time task message: %v", err) + log.Printf("JSON content: %s", string(msg.Msg)) + return &pb.Message{ + Type: pb.MessageType_ISSUE_ONE_TIME_TASK, + Direction: pb.Direction_RESPONSE, + Identity: msg.Identity, + Msg: []byte(fmt.Sprintf("failed to parse job: %v", err)), + }, nil + } + + // Preprocess passwords once (decrypt encrypted passwords in configmap) + // This avoids repeated decryption during each task execution + if err := preprocessJobPasswords(&job); err != nil { + log.Printf("Failed to preprocess job passwords: %v", err) + } + + // Ensure job is marked as non-cyclic + job.IsCyclic = false + + log.Printf("Adding one-time job to scheduler: jobID=%d, monitorID=%d, app=%s", + job.ID, job.MonitorID, job.App) + + // Add job to scheduler + if err := p.scheduler.AddAsyncCollectJob(&job); err != nil { + log.Printf("Failed to add one-time job to scheduler: %v", err) + return &pb.Message{ + Type: pb.MessageType_ISSUE_ONE_TIME_TASK, + Direction: pb.Direction_RESPONSE, + Identity: msg.Identity, + Msg: []byte(fmt.Sprintf("failed to schedule job: %v", err)), + }, nil + } + + log.Printf("Successfully scheduled one-time job: jobID=%d", job.ID) return &pb.Message{ Type: pb.MessageType_ISSUE_ONE_TIME_TASK, Direction: pb.Direction_RESPONSE, Identity: msg.Identity, - Msg: []byte("one-time task ack"), + Msg: []byte("one-time task scheduled successfully"), }, nil } // RegisterDefaultProcessors registers all default message processors -func RegisterDefaultProcessors(client *GrpcClient) { +func RegisterDefaultProcessors(client *GrpcClient, scheduler JobScheduler) { client.RegisterProcessor(MessageTypeHeartbeat, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { processor := &HeartbeatProcessor{} @@ -221,7 +442,7 @@ func RegisterDefaultProcessors(client *GrpcClient) { client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { - processor := NewCollectCyclicDataProcessor(client) + processor := NewCollectCyclicDataProcessor(client, scheduler) return processor.Process(pbMsg) } return nil, fmt.Errorf("invalid message type") @@ -229,7 +450,7 @@ func RegisterDefaultProcessors(client *GrpcClient) { client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { - processor := NewDeleteCyclicTaskProcessor(client) + processor := NewDeleteCyclicTaskProcessor(client, scheduler) return processor.Process(pbMsg) } return nil, fmt.Errorf("invalid message type") @@ -237,7 +458,7 @@ func RegisterDefaultProcessors(client *GrpcClient) { client.RegisterProcessor(MessageTypeIssueOneTimeTask, func(msg interface{}) (interface{}, error) { if pbMsg, ok := msg.(*pb.Message); ok { - processor := NewCollectOneTimeDataProcessor(client) + processor := NewCollectOneTimeDataProcessor(client, scheduler) return processor.Process(pbMsg) } return nil, fmt.Errorf("invalid message type") diff --git a/internal/util/arrow/arrow_serializer.go b/internal/util/arrow/arrow_serializer.go new file mode 100644 index 0000000..1915b43 --- /dev/null +++ b/internal/util/arrow/arrow_serializer.go @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package arrow + +import ( + "bytes" + "encoding/json" + "fmt" + + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// ArrowSerializer handles Arrow format serialization for metrics data +type ArrowSerializer struct { + logger logger.Logger +} + +// NewArrowSerializer creates a new Arrow serializer +func NewArrowSerializer(logger logger.Logger) *ArrowSerializer { + return &ArrowSerializer{ + logger: logger.WithName("arrow-serializer"), + } +} + +// SerializeMetricsData serializes metrics data to Arrow format +// For now, we'll use a compatible format that Manager can handle +func (as *ArrowSerializer) SerializeMetricsData(dataList []*jobtypes.CollectRepMetricsData) ([]byte, error) { + if len(dataList) == 0 { + return nil, fmt.Errorf("empty data list") + } + + // For compatibility with Java Manager, we need to create a format that + // ArrowUtil.deserializeMetricsData() can handle + + // First, let's try to create a simple Arrow-compatible format + // Since we don't have full Arrow library, we'll create a minimal implementation + + // Convert to a format similar to what Java expects + arrowCompatibleData := make([]map[string]interface{}, len(dataList)) + + for i, data := range dataList { + // Create Arrow-like structure + arrowData := map[string]interface{}{ + "id": data.ID, + "tenantId": data.TenantID, + "app": data.App, + "metrics": data.Metrics, + "priority": data.Priority, + "time": data.Time, + "code": data.Code, + "msg": data.Msg, + } + + // Convert Values to Arrow-compatible format + if len(data.Values) > 0 { + // Create column-based structure like Arrow + columns := make(map[string][]interface{}) + + for _, valueRow := range data.Values { + for i, value := range valueRow.Columns { + if i < len(data.Fields) { + fieldName := data.Fields[i].Field + if columns[fieldName] == nil { + columns[fieldName] = make([]interface{}, 0, len(data.Values)) + } + columns[fieldName] = append(columns[fieldName], value) + } + } + } + + arrowData["columns"] = columns + arrowData["rowCount"] = len(data.Values) + } + + arrowCompatibleData[i] = arrowData + } + + // For now, serialize as JSON but with Arrow-like structure + // TODO: Implement proper Arrow serialization when Arrow library is available + jsonBytes, err := json.Marshal(arrowCompatibleData) + if err != nil { + as.logger.Error(err, "failed to serialize arrow-compatible data") + return nil, fmt.Errorf("failed to serialize arrow-compatible data: %w", err) + } + + // Create a minimal Arrow-like binary format + // This is a temporary solution until we have proper Arrow support + return as.createArrowLikeBinary(jsonBytes) +} + +// createArrowLikeBinary creates a binary format that might be compatible with Arrow reader +func (as *ArrowSerializer) createArrowLikeBinary(jsonData []byte) ([]byte, error) { + // Create a simple binary format that includes: + // 1. Magic bytes (Arrow-like) + // 2. Schema information + // 3. Data + + var buffer bytes.Buffer + + // Write Arrow-like magic bytes + // This is a simplified version - real Arrow has specific magic bytes + magic := []byte("ARROW1\x00\x00") + buffer.Write(magic) + + // Write schema length (placeholder) + schemaLen := uint32(0) // No schema for now + buffer.Write([]byte{byte(schemaLen), byte(schemaLen >> 8), byte(schemaLen >> 16), byte(schemaLen >> 24)}) + + // Write data length + dataLen := uint32(len(jsonData)) + buffer.Write([]byte{byte(dataLen), byte(dataLen >> 8), byte(dataLen >> 16), byte(dataLen >> 24)}) + + // Write the JSON data (as a temporary measure) + buffer.Write(jsonData) + + as.logger.V(1).Info("created arrow-like binary format", + "originalSize", len(jsonData), + "binarySize", buffer.Len()) + + return buffer.Bytes(), nil +} + +// FallbackToJSON provides JSON serialization as fallback +func (as *ArrowSerializer) FallbackToJSON(dataList []*jobtypes.CollectRepMetricsData) ([]byte, error) { + as.logger.Info("falling back to JSON serialization", "dataCount", len(dataList)) + + jsonBytes, err := json.Marshal(dataList) + if err != nil { + return nil, fmt.Errorf("failed to serialize to JSON: %w", err) + } + + return jsonBytes, nil +} diff --git a/internal/util/crypto/aes_util.go b/internal/util/crypto/aes_util.go new file mode 100644 index 0000000..678760f --- /dev/null +++ b/internal/util/crypto/aes_util.go @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package crypto + +import ( + "crypto/aes" + "crypto/cipher" + "encoding/base64" + "fmt" + "os" + "sync" + "unicode" + + loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// AESUtil provides AES encryption/decryption utilities +// This matches Java's AesUtil functionality +type AESUtil struct { + secretKey string + mutex sync.RWMutex +} + +var ( + defaultAESUtil = &AESUtil{} + once sync.Once + log = logger.DefaultLogger(os.Stdout, loggertype.LogLevelInfo).WithName("aes-util") +) + +// GetDefaultAESUtil returns the singleton AES utility instance +func GetDefaultAESUtil() *AESUtil { + once.Do(func() { + defaultAESUtil = &AESUtil{} + }) + return defaultAESUtil +} + +// SetDefaultSecretKey sets the default AES secret key (matches Java: AesUtil.setDefaultSecretKey) +func SetDefaultSecretKey(secretKey string) { + GetDefaultAESUtil().SetSecretKey(secretKey) +} + +// GetDefaultSecretKey gets the current AES secret key (matches Java: AesUtil.getDefaultSecretKey) +func GetDefaultSecretKey() string { + return GetDefaultAESUtil().GetSecretKey() +} + +// SetSecretKey sets the AES secret key for this instance +func (a *AESUtil) SetSecretKey(secretKey string) { + a.mutex.Lock() + defer a.mutex.Unlock() + a.secretKey = secretKey + log.Info("AES secret key updated", "keyLength", len(secretKey)) +} + +// GetSecretKey gets the AES secret key for this instance +func (a *AESUtil) GetSecretKey() string { + a.mutex.RLock() + defer a.mutex.RUnlock() + return a.secretKey +} + +// AesDecode decrypts AES encrypted data (matches Java: AesUtil.aesDecode) +func (a *AESUtil) AesDecode(encryptedData string) (string, error) { + secretKey := a.GetSecretKey() + if secretKey == "" { + return "", fmt.Errorf("AES secret key not set") + } + return a.AesDecodeWithKey(encryptedData, secretKey) +} + +// AesDecodeWithKey decrypts AES encrypted data with specified key +func (a *AESUtil) AesDecodeWithKey(encryptedData, key string) (string, error) { + // Ensure key is exactly 16 bytes for AES-128 (Java requirement) + keyBytes := []byte(key) + if len(keyBytes) != 16 { + return "", fmt.Errorf("key must be exactly 16 bytes, got %d", len(keyBytes)) + } + + // Decode base64 encoded data + encryptedBytes, err := base64.StdEncoding.DecodeString(encryptedData) + if err != nil { + return "", fmt.Errorf("failed to decode base64: %w", err) + } + + // Create AES cipher + block, err := aes.NewCipher(keyBytes) + if err != nil { + return "", fmt.Errorf("failed to create AES cipher: %w", err) + } + + // Check if data length is valid for AES + if len(encryptedBytes) < aes.BlockSize || len(encryptedBytes)%aes.BlockSize != 0 { + return "", fmt.Errorf("invalid encrypted data length: %d", len(encryptedBytes)) + } + + // Java uses the key itself as IV (this matches Java's implementation) + iv := keyBytes + + // Create CBC decrypter + mode := cipher.NewCBCDecrypter(block, iv) + + // Decrypt + decrypted := make([]byte, len(encryptedBytes)) + mode.CryptBlocks(decrypted, encryptedBytes) + + // Remove PKCS5/PKCS7 padding (Go's PKCS7 is compatible with Java's PKCS5 for AES) + decrypted, err = removePKCS7Padding(decrypted) + if err != nil { + return "", fmt.Errorf("failed to remove padding: %w", err) + } + + return string(decrypted), nil +} + +// IsCiphertext determines whether text is encrypted (matches Java: AesUtil.isCiphertext) +func (a *AESUtil) IsCiphertext(text string) bool { + // First check if it's valid base64 + if _, err := base64.StdEncoding.DecodeString(text); err != nil { + return false + } + + // Try to decrypt and see if it succeeds + _, err := a.AesDecode(text) + return err == nil +} + +// removePKCS7Padding removes PKCS7 padding from decrypted data +func removePKCS7Padding(data []byte) ([]byte, error) { + if len(data) == 0 { + return data, nil + } + + paddingLen := int(data[len(data)-1]) + if paddingLen > len(data) || paddingLen == 0 { + return data, nil // Return as-is if padding seems invalid + } + + // Verify padding + for i := 0; i < paddingLen; i++ { + if data[len(data)-1-i] != byte(paddingLen) { + return data, nil // Return as-is if padding is invalid + } + } + + return data[:len(data)-paddingLen], nil +} + +// isPrintableString checks if a string contains only printable characters +func isPrintableString(s string) bool { + if len(s) == 0 { + return false + } + + for _, r := range s { + if !unicode.IsPrint(r) && !unicode.IsSpace(r) { + return false + } + } + return true +} + +// Convenience functions that match Java's static methods + +// AesDecode decrypts with the default secret key +func AesDecode(encryptedData string) (string, error) { + return GetDefaultAESUtil().AesDecode(encryptedData) +} + +// AesDecodeWithKey decrypts with specified key +func AesDecodeWithKey(encryptedData, key string) (string, error) { + return GetDefaultAESUtil().AesDecodeWithKey(encryptedData, key) +} + +// IsCiphertext checks if text is encrypted using default key +func IsCiphertext(text string) bool { + return GetDefaultAESUtil().IsCiphertext(text) +} diff --git a/internal/util/param/param_replacer.go b/internal/util/param/param_replacer.go new file mode 100644 index 0000000..ff6a944 --- /dev/null +++ b/internal/util/param/param_replacer.go @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package param + +import ( + "encoding/json" + "fmt" + "os" + "strconv" + "strings" + + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +// Replacer is a standalone utility for parameter replacement +// This matches Java's parameter substitution mechanism where ^_^paramName^_^ gets replaced with actual values +type Replacer struct{} + +// NewReplacer creates a new parameter replacer +func NewReplacer() *Replacer { + return &Replacer{} +} + +// PreprocessJobPasswords decrypts passwords in job configmap once during job creation +// This permanently replaces encrypted passwords with decrypted ones in the job configmap +func (r *Replacer) PreprocessJobPasswords(job *jobtypes.Job) error { + if job == nil || job.Configmap == nil { + return nil + } + + log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug).WithName("password-preprocessor") + + // Decrypt passwords in configmap once and permanently replace them + for i := range job.Configmap { + config := &job.Configmap[i] // Get pointer to modify in place + if config.Type == 2 { // password type + if encryptedValue, ok := config.Value.(string); ok { + log.Sugar().Debugf("preprocessing encrypted password for key: %s", config.Key) + if decoded, err := r.decryptPassword(encryptedValue); err == nil { + log.Info("password preprocessing successful", "key", config.Key, "length", len(decoded)) + config.Value = decoded // Permanently replace encrypted value with decrypted value + config.Type = 1 // Change type to string since it's now decrypted + } else { + log.Error(err, "password preprocessing failed, keeping original value", "key", config.Key) + } + } + } + } + return nil +} + +// ReplaceJobParams replaces all parameter placeholders in job configuration +func (r *Replacer) ReplaceJobParams(job *jobtypes.Job) (*jobtypes.Job, error) { + if job == nil { + return nil, fmt.Errorf("job is nil") + } + + // Create parameter map from configmap (passwords should already be decrypted) + paramMap := r.createParamMapSimple(job.Configmap) + + // Clone the job to avoid modifying original + clonedJob := job.Clone() + if clonedJob == nil { + return nil, fmt.Errorf("failed to clone job") + } + + // Replace parameters in all metrics + for i := range clonedJob.Metrics { + if err := r.replaceMetricsParams(&clonedJob.Metrics[i], paramMap); err != nil { + return nil, fmt.Errorf("failed to replace params in metrics %s: %w", clonedJob.Metrics[i].Name, err) + } + } + + return clonedJob, nil +} + +// createParamMapSimple creates a parameter map from configmap entries +// Assumes passwords have already been decrypted by PreprocessJobPasswords +func (r *Replacer) createParamMapSimple(configmap []jobtypes.Configmap) map[string]string { + paramMap := make(map[string]string) + + for _, config := range configmap { + // Convert value to string, handle null values as empty strings + var strValue string + if config.Value == nil { + strValue = "" // null values become empty strings + } else { + switch config.Type { + case 0: // number + if numVal, ok := config.Value.(float64); ok { + strValue = strconv.FormatFloat(numVal, 'f', -1, 64) + } else if intVal, ok := config.Value.(int); ok { + strValue = strconv.Itoa(intVal) + } else { + strValue = fmt.Sprintf("%v", config.Value) + } + case 1, 2: // string or password (both are now plain strings after preprocessing) + strValue = fmt.Sprintf("%v", config.Value) + default: + strValue = fmt.Sprintf("%v", config.Value) + } + } + paramMap[config.Key] = strValue + } + + return paramMap +} + +// createParamMap creates a parameter map from configmap entries (legacy version with decryption) +// Deprecated: Use PreprocessJobPasswords + createParamMapSafe instead +func (r *Replacer) createParamMap(configmap []jobtypes.Configmap) map[string]string { + paramMap := make(map[string]string) + + for _, config := range configmap { + // Convert value to string based on type, handle null values as empty strings + var strValue string + if config.Value == nil { + strValue = "" // null values become empty strings + } else { + switch config.Type { + case 0: // number + if numVal, ok := config.Value.(float64); ok { + strValue = strconv.FormatFloat(numVal, 'f', -1, 64) + } else if intVal, ok := config.Value.(int); ok { + strValue = strconv.Itoa(intVal) + } else { + strValue = fmt.Sprintf("%v", config.Value) + } + case 1: // string + strValue = fmt.Sprintf("%v", config.Value) + case 2: // password (encrypted) + if encryptedValue, ok := config.Value.(string); ok { + log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug).WithName("param-replacer") + log.Sugar().Debugf("attempting to decrypt password: %s", encryptedValue) + if decoded, err := r.decryptPassword(encryptedValue); err == nil { + log.Info("password decryption successful", "length", len(decoded)) + strValue = decoded + } else { + log.Error(err, "password decryption failed, using original value") + // Fallback to original value if decryption fails + strValue = encryptedValue + } + } else { + strValue = fmt.Sprintf("%v", config.Value) + } + default: + strValue = fmt.Sprintf("%v", config.Value) + } + } + paramMap[config.Key] = strValue + } + + return paramMap +} + +// replaceMetricsParams replaces parameters in metrics configuration +func (r *Replacer) replaceMetricsParams(metrics *jobtypes.Metrics, paramMap map[string]string) error { + // Replace parameters in protocol-specific configurations + // Currently only JDBC is defined as interface{} in the struct definition + // Other protocols are concrete struct pointers and don't contain placeholders from JSON + if metrics.JDBC != nil { + if err := r.replaceProtocolParams(&metrics.JDBC, paramMap); err != nil { + return fmt.Errorf("failed to replace JDBC params: %w", err) + } + } + + // TODO: Add other protocol configurations as they are converted to interface{} types + // For now, other protocols (HTTP, SSH, etc.) are concrete struct pointers and + // don't require parameter replacement + + // Replace parameters in basic metrics fields + if err := r.replaceBasicMetricsParams(metrics, paramMap); err != nil { + return fmt.Errorf("failed to replace basic metrics params: %w", err) + } + + return nil +} + +// replaceProtocolParams replaces parameters in any protocol configuration +func (r *Replacer) replaceProtocolParams(protocolInterface *interface{}, paramMap map[string]string) error { + if *protocolInterface == nil { + return nil + } + + // Convert protocol interface{} to map for manipulation + protocolMap, ok := (*protocolInterface).(map[string]interface{}) + if !ok { + // If it's already processed or not a map, skip + return nil + } + + // Recursively replace parameters in all string values + return r.replaceParamsInMap(protocolMap, paramMap) +} + +// replaceParamsInMap recursively replaces parameters in a map structure +func (r *Replacer) replaceParamsInMap(data map[string]interface{}, paramMap map[string]string) error { + for key, value := range data { + switch v := value.(type) { + case string: + // Replace parameters in string values + data[key] = r.replaceParamPlaceholders(v, paramMap) + case map[string]interface{}: + // Recursively handle nested maps + if err := r.replaceParamsInMap(v, paramMap); err != nil { + return fmt.Errorf("failed to replace params in nested map %s: %w", key, err) + } + case []interface{}: + // Handle arrays + for i, item := range v { + if itemMap, ok := item.(map[string]interface{}); ok { + if err := r.replaceParamsInMap(itemMap, paramMap); err != nil { + return fmt.Errorf("failed to replace params in array item %d: %w", i, err) + } + } else if itemStr, ok := item.(string); ok { + v[i] = r.replaceParamPlaceholders(itemStr, paramMap) + } + } + } + } + return nil +} + +// replaceBasicMetricsParams replaces parameters in basic metrics fields +func (r *Replacer) replaceBasicMetricsParams(metrics *jobtypes.Metrics, paramMap map[string]string) error { + // Replace parameters in basic string fields + metrics.Host = r.replaceParamPlaceholders(metrics.Host, paramMap) + metrics.Port = r.replaceParamPlaceholders(metrics.Port, paramMap) + metrics.Timeout = r.replaceParamPlaceholders(metrics.Timeout, paramMap) + metrics.Range = r.replaceParamPlaceholders(metrics.Range, paramMap) + + // Replace parameters in ConfigMap + if metrics.ConfigMap != nil { + for key, value := range metrics.ConfigMap { + metrics.ConfigMap[key] = r.replaceParamPlaceholders(value, paramMap) + } + } + + return nil +} + +// replaceParamPlaceholders replaces ^_^paramName^_^ placeholders with actual values +func (r *Replacer) replaceParamPlaceholders(template string, paramMap map[string]string) string { + // Guard against empty templates to prevent strings.ReplaceAll issues + if template == "" { + return "" + } + + result := template + + // Find all ^_^paramName^_^ patterns and replace them + for paramName, paramValue := range paramMap { + // Guard against empty parameter names + if paramName == "" { + continue + } + placeholder := fmt.Sprintf("^_^%s^_^", paramName) + result = strings.ReplaceAll(result, placeholder, paramValue) + } + + return result +} + +// ExtractProtocolConfig extracts and processes protocol configuration after parameter replacement +func (r *Replacer) ExtractProtocolConfig(protocolInterface interface{}, targetStruct interface{}) error { + if protocolInterface == nil { + return fmt.Errorf("protocol interface is nil") + } + + // If it's a map (from JSON parsing), convert to target struct + if protocolMap, ok := protocolInterface.(map[string]interface{}); ok { + // Convert map to JSON and then unmarshal to target struct + jsonData, err := json.Marshal(protocolMap) + if err != nil { + return fmt.Errorf("failed to marshal protocol config: %w", err) + } + + if err := json.Unmarshal(jsonData, targetStruct); err != nil { + return fmt.Errorf("failed to unmarshal protocol config: %w", err) + } + + return nil + } + + return fmt.Errorf("unsupported protocol config type: %T", protocolInterface) +} + +// ExtractJDBCConfig extracts and processes JDBC configuration after parameter replacement +func (r *Replacer) ExtractJDBCConfig(jdbcInterface interface{}) (*jobtypes.JDBCProtocol, error) { + if jdbcInterface == nil { + return nil, nil + } + + // If it's already a JDBCProtocol struct + if jdbcConfig, ok := jdbcInterface.(*jobtypes.JDBCProtocol); ok { + return jdbcConfig, nil + } + + // Use the generic extraction method + var jdbcConfig jobtypes.JDBCProtocol + if err := r.ExtractProtocolConfig(jdbcInterface, &jdbcConfig); err != nil { + return nil, fmt.Errorf("failed to extract JDBC config: %w", err) + } + + return &jdbcConfig, nil +} + +// ExtractHTTPConfig extracts and processes HTTP configuration after parameter replacement +func (r *Replacer) ExtractHTTPConfig(httpInterface interface{}) (*jobtypes.HTTPProtocol, error) { + if httpInterface == nil { + return nil, nil + } + + // If it's already an HTTPProtocol struct + if httpConfig, ok := httpInterface.(*jobtypes.HTTPProtocol); ok { + return httpConfig, nil + } + + // Use the generic extraction method + var httpConfig jobtypes.HTTPProtocol + if err := r.ExtractProtocolConfig(httpInterface, &httpConfig); err != nil { + return nil, fmt.Errorf("failed to extract HTTP config: %w", err) + } + + return &httpConfig, nil +} + +// ExtractSSHConfig extracts and processes SSH configuration after parameter replacement +func (r *Replacer) ExtractSSHConfig(sshInterface interface{}) (*jobtypes.SSHProtocol, error) { + if sshInterface == nil { + return nil, nil + } + + // If it's already an SSHProtocol struct + if sshConfig, ok := sshInterface.(*jobtypes.SSHProtocol); ok { + return sshConfig, nil + } + + // Use the generic extraction method + var sshConfig jobtypes.SSHProtocol + if err := r.ExtractProtocolConfig(sshInterface, &sshConfig); err != nil { + return nil, fmt.Errorf("failed to extract SSH config: %w", err) + } + + return &sshConfig, nil +} + +// decryptPassword decrypts an encrypted password using AES +// This implements the same algorithm as Java manager's AESUtil +func (r *Replacer) decryptPassword(encryptedPassword string) (string, error) { + log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug).WithName("password-decrypt") + + // Use the AES utility (matches Java: AesUtil.aesDecode) + if result, err := crypto.AesDecode(encryptedPassword); err == nil { + log.Info("password decrypted successfully", "length", len(result)) + return result, nil + } else { + log.Sugar().Debugf("primary decryption failed: %v", err) + } + + // Fallback: try with default Java key if no manager key is set + defaultKey := "tomSun28HaHaHaHa" + if result, err := crypto.AesDecodeWithKey(encryptedPassword, defaultKey); err == nil { + log.Info("password decrypted with default key", "length", len(result)) + return result, nil + } + + // If all decryption attempts fail, return original value to allow system to continue + log.Info("all decryption attempts failed, using original value") + return encryptedPassword, nil +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
