This is an automated email from the ASF dual-hosted git repository.
zhaoqingran pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to refs/heads/main by this push:
new a605b28 feat: add metrics collector with unit conversion and
expression evaluation (#26)
a605b28 is described below
commit a605b281c13ffacb43deb7344687a3969254a593
Author: Yang Yexuan <[email protected]>
AuthorDate: Fri Oct 24 16:31:21 2025 +0800
feat: add metrics collector with unit conversion and expression evaluation
(#26)
* feat: add metrics collector with unit conversion and expression evaluation
Implement core metrics collection framework including:
- MetricsCollector for field calculation and unit conversion
- Unit converter supporting bytes/bits/time conversions
- Expression-based field calculations
- Test suite
Add dependency: github.com/expr-lang/expr v1.16.9
* Remove package unit in Licence Header
* fix: remain empty unit instead of changing it to none
---
go.mod | 1 +
go.sum | 2 +
internal/collector/basic/ssh/ssh_collector.go | 15 +-
internal/collector/common/collect/dispatch/.keep | 0
.../common/collect/dispatch/metrics_collector.go | 483 ++++++++++++++
.../collect/dispatch/metrics_collector_test.go | 724 +++++++++++++++++++++
.../common/collect/lazy_message_router.go | 8 +-
internal/constants/const.go | 10 +-
internal/util/param/param_replacer.go | 55 ++
internal/util/unit/converter.go | 137 ++++
internal/util/unit/converter_test.go | 187 ++++++
11 files changed, 1606 insertions(+), 16 deletions(-)
diff --git a/go.mod b/go.mod
index 6115950..560318f 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,7 @@ go 1.24.6
require (
github.com/apache/arrow/go/v13 v13.0.0
+ github.com/expr-lang/expr v1.16.9
github.com/go-logr/logr v1.4.3
github.com/go-logr/zapr v1.3.0
github.com/go-sql-driver/mysql v1.9.3
diff --git a/go.sum b/go.sum
index e914d56..2587cf1 100644
--- a/go.sum
+++ b/go.sum
@@ -21,6 +21,8 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod
h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/expr-lang/expr v1.16.9
h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI=
+github.com/expr-lang/expr v1.16.9/go.mod
h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod
h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
diff --git a/internal/collector/basic/ssh/ssh_collector.go
b/internal/collector/basic/ssh/ssh_collector.go
index 42734e2..39ef137 100644
--- a/internal/collector/basic/ssh/ssh_collector.go
+++ b/internal/collector/basic/ssh/ssh_collector.go
@@ -38,7 +38,7 @@ const (
// Special fields
ResponseTime = "responseTime"
- NullValue = " "
+ NullValue = consts.NullValue
// Parse types
ParseTypeOneRow = "oneRow" // Each line corresponds to one field
value, in order of aliasFields
@@ -244,13 +244,12 @@ func (sshc *SSHCollector)
createSuccessResponseWithResult(metrics *jobtypes.Metr
Metrics: metrics.Name,
Priority: 0,
Time: time.Now().UnixMilli(),
- // FIXME: use consts.CollectSuccess
- Code: 200, // Success
- Msg: "success",
- Fields: make([]jobtypes.Field, 0),
- Values: make([]jobtypes.ValueRow, 0),
- Labels: make(map[string]string),
- Metadata: make(map[string]string),
+ Code: consts.CollectSuccess, // Success
+ Msg: "success",
+ Fields: make([]jobtypes.Field, 0),
+ Values: make([]jobtypes.ValueRow, 0),
+ Labels: make(map[string]string),
+ Metadata: make(map[string]string),
}
// Extract SSH configuration to get ParseType
diff --git a/internal/collector/common/collect/dispatch/.keep
b/internal/collector/common/collect/dispatch/.keep
deleted file mode 100644
index e69de29..0000000
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go
b/internal/collector/common/collect/dispatch/metrics_collector.go
index 1f46485..1a36d52 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -20,9 +20,15 @@
package dispatch
import (
+ "encoding/json"
"fmt"
+ "strconv"
+ "strings"
"time"
+ "github.com/expr-lang/expr"
+ "github.com/expr-lang/expr/vm"
+
// Import basic package with blank identifier to trigger its init()
function
// This ensures all collector factories are registered automatically
_ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic"
@@ -30,6 +36,8 @@ import (
jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/unit"
)
// MetricsCollector handles metrics collection using goroutines and channels
@@ -44,6 +52,476 @@ func NewMetricsCollector(logger logger.Logger)
*MetricsCollector {
}
}
+// CalculateFields calculates fields in metrics by configured expressions
+// This method processes the collected data and applies calculations, unit
conversions, and filters
+func (mc *MetricsCollector) CalculateFields(metrics *jobtypes.Metrics, result
*jobtypes.CollectRepMetricsData) {
+ if result == nil || metrics == nil {
+ return
+ }
+ result.Priority = metrics.Priority
+ fields := metrics.Fields
+ result.Fields = fields
+ aliasRowList := result.Values
+ if len(aliasRowList) == 0 {
+ return
+ }
+ result.Values = make([]jobtypes.ValueRow, 0)
+ aliasFields := metrics.AliasFields
+ if len(aliasFields) == 0 {
+ return
+ }
+ replacer := param.NewReplacer()
+ fieldExpressionMap, fieldAliasMap :=
mc.parseCalculates(metrics.Calculates)
+ fieldUnitConversionMap := mc.parseUnits(metrics.Units)
+
+ for _, aliasRow := range aliasRowList {
+ aliasFieldValueMap := make(map[string]string)
+ fieldValueMap := make(map[string]interface{})
+ stringTypeFieldValueMap := make(map[string]interface{})
+ aliasFieldUnitMap := make(map[string]string)
+
+ for aliasIndex, aliasField := range aliasFields {
+ if aliasIndex >= len(aliasRow.Columns) {
+ break
+ }
+ aliasFieldValue := aliasRow.Columns[aliasIndex]
+ if aliasFieldValue == constants.NullValue {
+ fieldValueMap[aliasField] = nil
+ stringTypeFieldValueMap[aliasField] = nil
+ continue
+ }
+ aliasFieldValueMap[aliasField] = aliasFieldValue
+ // Try to extract numeric value and unit
+ doubleAndUnit :=
replacer.ExtractDoubleAndUnitFromStr(aliasFieldValue)
+ if doubleAndUnit != nil {
+ fieldValueMap[aliasField] = doubleAndUnit.Value
+ if doubleAndUnit.Unit != "" {
+ aliasFieldUnitMap[aliasField] =
doubleAndUnit.Unit
+ }
+ } else {
+ fieldValueMap[aliasField] = aliasFieldValue
+ }
+ stringTypeFieldValueMap[aliasField] = aliasFieldValue
+ }
+
+ // Build the real value row based on configured fields
+ realValueRow := jobtypes.ValueRow{
+ Columns: make([]string, 0, len(fields)),
+ }
+
+ for _, field := range fields {
+ realField := field.Field
+ var value string
+
+ if program, exists := fieldExpressionMap[realField];
exists {
+ var context map[string]interface{}
+ if field.Type == constants.TypeString {
+ context = stringTypeFieldValueMap
+ } else {
+ context = fieldValueMap
+ }
+ output, err := expr.Run(program, context)
+ if err != nil {
+ mc.logger.V(1).Info("expression
evaluation failed, using original value",
+ "field", realField,
+ "error", err)
+ // Fallback to alias field value
+ aliasField := fieldAliasMap[realField]
+ if aliasField != "" {
+ value =
aliasFieldValueMap[aliasField]
+ } else {
+ value =
aliasFieldValueMap[realField]
+ }
+ } else if output != nil {
+ value = fmt.Sprintf("%v", output)
+ }
+ } else {
+ // No expression, do simple field mapping
+ aliasField := fieldAliasMap[realField]
+ if aliasField != "" {
+ value = aliasFieldValueMap[aliasField]
+ } else {
+ value = aliasFieldValueMap[realField]
+ }
+ }
+
+ // Process based on field type if value exists
+ if value != "" && value != constants.NullValue {
+ switch field.Type {
+ case constants.TypeNumber:
+ // Extract numeric value and format
+ doubleAndUnit :=
replacer.ExtractDoubleAndUnitFromStr(value)
+ if doubleAndUnit != nil {
+ numericValue :=
doubleAndUnit.Value
+ // Apply unit conversion if
configured for this field
+ if conversion, exists :=
fieldUnitConversionMap[realField]; exists {
+ // Get the original
unit from extracted value or use configured origin unit
+ originUnit :=
doubleAndUnit.Unit
+ if originUnit == "" {
+ originUnit =
conversion.OriginUnit
+ }
+ // Perform unit
conversion
+ convertedValue, err :=
unit.Convert(numericValue, originUnit, conversion.NewUnit)
+ if err != nil {
+
mc.logger.V(1).Info("unit conversion failed, using original value",
+
"field", realField,
+
"originUnit", originUnit,
+
"newUnit", conversion.NewUnit,
+
"error", err)
+ } else {
+ numericValue =
convertedValue
+
mc.logger.V(2).Info("unit conversion applied",
+
"field", realField,
+
"originalValue", doubleAndUnit.Value,
+
"originUnit", originUnit,
+
"convertedValue", convertedValue,
+
"newUnit", conversion.NewUnit)
+ }
+ }
+ value =
formatNumber(numericValue)
+ } else {
+ value = constants.NullValue
+ }
+
+ case constants.TypeTime:
+ // TODO: Implement time parsing
+ // For now, keep original value
+ }
+ }
+
+ if value == "" {
+ value = constants.NullValue
+ }
+
+ realValueRow.Columns = append(realValueRow.Columns,
value)
+
+ // Add the calculated field value to context maps for
use in subsequent expressions
+ // This allows later expressions to reference earlier
calculated fields
+ if value != constants.NullValue {
+ doubleAndUnit :=
replacer.ExtractDoubleAndUnitFromStr(value)
+ if doubleAndUnit != nil {
+ fieldValueMap[realField] =
doubleAndUnit.Value
+ } else {
+ fieldValueMap[realField] = value
+ }
+ stringTypeFieldValueMap[realField] = value
+ } else {
+ fieldValueMap[realField] = nil
+ stringTypeFieldValueMap[realField] = nil
+ }
+ }
+
+ // TODO: Apply filters based on metrics.Filters configuration
+
+ result.Values = append(result.Values, realValueRow)
+ }
+}
+
+// parseCalculates parses the calculates configuration and compiles expressions
+// Returns a map of field name to compiled expression program, and a map of
field name to alias field
+// Supports two formats:
+// 1. String array format: []string - "field=expression" or "field=aliasField"
+// Example: "usage=(used / total) * 100" or "total=total"
+// 2. Calculate struct array format: []Calculate - {field: "field", script:
"expression", aliasField: "alias"}
+// 3. Mixed interface array: []interface{} - can contain both strings and
Calculate structs
+func (mc *MetricsCollector) parseCalculates(calculates interface{})
(map[string]*vm.Program, map[string]string) {
+ fieldExpressionMap := make(map[string]*vm.Program)
+ fieldAliasMap := make(map[string]string)
+
+ if calculates == nil {
+ return fieldExpressionMap, fieldAliasMap
+ }
+
+ switch v := calculates.(type) {
+ case []string:
+ mc.parseCalculatesFromStringArray(v, fieldExpressionMap,
fieldAliasMap)
+
+ case []jobtypes.Calculate:
+ mc.parseCalculatesFromStructArray(v, fieldExpressionMap,
fieldAliasMap)
+
+ case []interface{}:
+ // Process each item individually to support mixed formats
+ for _, item := range v {
+ if str, ok := item.(string); ok {
+ // Handle string format: "field=expression" or
"field=aliasField"
+ parts := splitCalculateString(str)
+ if len(parts) == 2 {
+ field := parts[0]
+ expression := parts[1]
+
+ if expression == field ||
isSimpleFieldName(expression) {
+ fieldAliasMap[field] =
expression
+ } else {
+ program, err :=
expr.Compile(expression)
+ if err != nil {
+ mc.logger.Error(err,
"failed to compile expression",
+ "field", field,
+ "expression",
expression)
+ fieldAliasMap[field] =
expression
+ continue
+ }
+ fieldExpressionMap[field] =
program
+ }
+ }
+ } else if calcMap, ok := item.(map[string]interface{});
ok {
+ // Handle map format: {"field": "fieldName",
"aliasField": "alias", "script": "expression"}
+ calc := jobtypes.Calculate{}
+ if field, ok := calcMap["field"].(string); ok {
+ calc.Field = field
+ }
+ if script, ok := calcMap["script"].(string); ok
{
+ calc.Script = script
+ }
+ if aliasField, ok :=
calcMap["aliasField"].(string); ok {
+ calc.AliasField = aliasField
+ }
+
+ if calc.Field != "" {
+ if calc.Script != "" {
+ program, err :=
expr.Compile(calc.Script)
+ if err != nil {
+ mc.logger.Error(err,
"failed to compile expression",
+ "field",
calc.Field,
+ "script",
calc.Script)
+ continue
+ }
+ fieldExpressionMap[calc.Field]
= program
+ }
+ if calc.AliasField != "" {
+ fieldAliasMap[calc.Field] =
calc.AliasField
+ }
+ }
+ }
+ }
+
+ default:
+ // Try to unmarshal from JSON as last resort
+ if jsonData, err := json.Marshal(calculates); err == nil {
+ var stringArray []string
+ if err := json.Unmarshal(jsonData, &stringArray); err
== nil {
+ mc.parseCalculatesFromStringArray(stringArray,
fieldExpressionMap, fieldAliasMap)
+ } else {
+ var calculateList []jobtypes.Calculate
+ if err := json.Unmarshal(jsonData,
&calculateList); err == nil {
+
mc.parseCalculatesFromStructArray(calculateList, fieldExpressionMap,
fieldAliasMap)
+ }
+ }
+ }
+ }
+
+ return fieldExpressionMap, fieldAliasMap
+}
+
+// parseUnits parses the units configuration for unit conversions
+// Supports three formats:
+// 1. String array format: []string - "fieldName=originUnit->newUnit"
+// Example: "committed=B->MB"
+// 2. Unit struct array format: []Unit - {field: "field", unit:
"originUnit->newUnit"}
+// 3. Mixed interface array: []interface{} - can contain both strings and
Unit structs
+func (mc *MetricsCollector) parseUnits(units interface{})
map[string]*unit.Conversion {
+ fieldUnitConversionMap := make(map[string]*unit.Conversion)
+
+ if units == nil {
+ return fieldUnitConversionMap
+ }
+
+ switch v := units.(type) {
+ case []string:
+ mc.parseUnitsFromStringArray(v, fieldUnitConversionMap)
+
+ case []jobtypes.Unit:
+ mc.parseUnitsFromStructArray(v, fieldUnitConversionMap)
+
+ case []interface{}:
+ // Process each item individually to support mixed formats
+ for _, item := range v {
+ if str, ok := item.(string); ok {
+ // Handle string format:
"fieldName=originUnit->newUnit"
+ conversion, err := unit.ParseConversion(str)
+ if err != nil {
+ mc.logger.V(1).Info("invalid unit
conversion format, skipping",
+ "unitStr", str,
+ "error", err)
+ continue
+ }
+ fieldUnitConversionMap[conversion.Field] =
conversion
+ } else if unitMap, ok := item.(map[string]interface{});
ok {
+ // Handle map format: {"field": "fieldName",
"unit": "originUnit->newUnit"}
+ unitStruct := jobtypes.Unit{}
+ if field, ok := unitMap["field"].(string); ok {
+ unitStruct.Field = field
+ }
+ if unitStr, ok := unitMap["unit"].(string); ok {
+ unitStruct.Unit = unitStr
+ }
+
+ if unitStruct.Field != "" && unitStruct.Unit !=
"" {
+ // Parse the unit conversion string
+ conversionStr := unitStruct.Field + "="
+ unitStruct.Unit
+ conversion, err :=
unit.ParseConversion(conversionStr)
+ if err != nil {
+ mc.logger.V(1).Info("invalid
unit conversion format, skipping",
+ "field",
unitStruct.Field,
+ "unit", unitStruct.Unit,
+ "error", err)
+ continue
+ }
+
fieldUnitConversionMap[conversion.Field] = conversion
+ }
+ }
+ }
+
+ default:
+ // Try to unmarshal from JSON as last resort
+ if jsonData, err := json.Marshal(units); err == nil {
+ var stringArray []string
+ if err := json.Unmarshal(jsonData, &stringArray); err
== nil {
+ mc.parseUnitsFromStringArray(stringArray,
fieldUnitConversionMap)
+ } else {
+ var unitList []jobtypes.Unit
+ if err := json.Unmarshal(jsonData, &unitList);
err == nil {
+ mc.parseUnitsFromStructArray(unitList,
fieldUnitConversionMap)
+ }
+ }
+ }
+ }
+
+ return fieldUnitConversionMap
+}
+
+// parseUnitsFromStringArray parses units from string array format
+// Format: "fieldName=originUnit->newUnit"
+// Example: "committed=B->MB"
+func (mc *MetricsCollector) parseUnitsFromStringArray(units []string,
fieldUnitConversionMap map[string]*unit.Conversion) {
+ for _, unitStr := range units {
+ conversion, err := unit.ParseConversion(unitStr)
+ if err != nil {
+ mc.logger.V(1).Info("invalid unit conversion format,
skipping",
+ "unitStr", unitStr,
+ "error", err)
+ continue
+ }
+ fieldUnitConversionMap[conversion.Field] = conversion
+ }
+}
+
+// parseUnitsFromStructArray parses units from Unit struct array
+func (mc *MetricsCollector) parseUnitsFromStructArray(units []jobtypes.Unit,
fieldUnitConversionMap map[string]*unit.Conversion) {
+ for _, u := range units {
+ if u.Field == "" || u.Unit == "" {
+ continue
+ }
+ // Parse the unit conversion string
+ conversionStr := u.Field + "=" + u.Unit
+ conversion, err := unit.ParseConversion(conversionStr)
+ if err != nil {
+ mc.logger.V(1).Info("invalid unit conversion format,
skipping",
+ "field", u.Field,
+ "unit", u.Unit,
+ "error", err)
+ continue
+ }
+ fieldUnitConversionMap[conversion.Field] = conversion
+ }
+}
+
+// parseCalculatesFromStringArray parses calculates from string array format
+// Format: "field=expression" where expression can be a simple field name or a
calculation
+// Examples:
+// - "total=total" (simple field mapping)
+// - "usage=(used / total) * 100" (expression calculation)
+func (mc *MetricsCollector) parseCalculatesFromStringArray(calculates []string,
+ fieldExpressionMap map[string]*vm.Program, fieldAliasMap
map[string]string) {
+
+ for _, calc := range calculates {
+ parts := splitCalculateString(calc)
+ if len(parts) != 2 {
+ mc.logger.V(1).Info("invalid calculate format,
skipping",
+ "calculate", calc)
+ continue
+ }
+
+ field := parts[0]
+ expression := parts[1]
+
+ // Check if expression is just a simple field name (alias
mapping)
+ // If expression equals field name, it's a direct mapping
+ if expression == field || isSimpleFieldName(expression) {
+ fieldAliasMap[field] = expression
+ } else {
+ program, err := expr.Compile(expression)
+ if err != nil {
+ mc.logger.Error(err, "failed to compile
expression",
+ "field", field,
+ "expression", expression)
+ fieldAliasMap[field] = expression
+ continue
+ }
+ fieldExpressionMap[field] = program
+ }
+ }
+}
+
+// parseCalculatesFromStructArray parses calculates from Calculate struct array
+func (mc *MetricsCollector) parseCalculatesFromStructArray(calculates
[]jobtypes.Calculate,
+ fieldExpressionMap map[string]*vm.Program, fieldAliasMap
map[string]string) {
+
+ for _, calc := range calculates {
+ if calc.Script != "" {
+ program, err := expr.Compile(calc.Script)
+ if err != nil {
+ mc.logger.Error(err, "failed to compile
expression",
+ "field", calc.Field,
+ "script", calc.Script)
+ continue
+ }
+ fieldExpressionMap[calc.Field] = program
+ }
+ if calc.AliasField != "" {
+ fieldAliasMap[calc.Field] = calc.AliasField
+ }
+ }
+}
+
+// splitCalculateString splits "field=expression" into [field, expression]
+// Simply splits by the first '=' character
+func splitCalculateString(calc string) []string {
+ idx := strings.Index(calc, "=")
+ if idx == -1 {
+ return []string{calc}
+ }
+
+ field := calc[:idx]
+ expression := calc[idx+1:]
+ return []string{field, expression}
+}
+
+// isSimpleFieldName checks if a string is a simple field name (alphanumeric
and underscore only)
+func isSimpleFieldName(s string) bool {
+ if s == "" {
+ return false
+ }
+ for _, ch := range s {
+ if !((ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z') ||
+ (ch >= '0' && ch <= '9') || ch == '_') {
+ return false
+ }
+ }
+ return true
+}
+
+// formatNumber formats a float64 to string with up to 4 decimal places
+func formatNumber(value float64) string {
+ formatted := strconv.FormatFloat(value, 'f', 4, 64)
+
+ // Remove trailing zeros after decimal point
+ if len(formatted) > 0 && (formatted[len(formatted)-1] == '0' ||
formatted[len(formatted)-1] == '.') {
+ formatted = strconv.FormatFloat(value, 'f', -1, 64)
+ }
+
+ return formatted
+}
+
// CollectMetrics collects metrics using the appropriate collector and returns
results via channel
// This method uses Go's channel-based concurrency instead of Java's thread
pools
func (mc *MetricsCollector) CollectMetrics(metrics *jobtypes.Metrics, job
*jobtypes.Job, timeout *jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData {
@@ -75,6 +553,11 @@ func (mc *MetricsCollector) CollectMetrics(metrics
*jobtypes.Metrics, job *jobty
// Perform the actual collection with metrics (parameters
should already be replaced by CommonDispatcher)
result := collector.Collect(metrics)
+ // calculate fields and convert units by metrics.Calculates and
metrics.Units
+ if result != nil && result.Code == constants.CollectSuccess {
+ mc.CalculateFields(metrics, result)
+ }
+
// Enrich result with job information
if result != nil {
result.ID = job.MonitorID // Use MonitorID for both ID
and MonitorID fields
diff --git
a/internal/collector/common/collect/dispatch/metrics_collector_test.go
b/internal/collector/common/collect/dispatch/metrics_collector_test.go
new file mode 100644
index 0000000..07186a3
--- /dev/null
+++ b/internal/collector/common/collect/dispatch/metrics_collector_test.go
@@ -0,0 +1,724 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package dispatch
+
+import (
+ "testing"
+
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+func TestCalculateFieldsWithUnitConversion(t *testing.T) {
+ // Create a logger for testing
+ log := logger.DefaultLogger(nil, loggertype.LogLevelInfo)
+
+ // Create metrics collector
+ mc := NewMetricsCollector(log)
+
+ tests := []struct {
+ name string
+ metrics *jobtypes.Metrics
+ collectResult *jobtypes.CollectRepMetricsData
+ expectedValues [][]string
+ }{
+ {
+ name: "byte to megabyte conversion",
+ metrics: &jobtypes.Metrics{
+ Name: "memory",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "total", Type:
constants.TypeNumber},
+ {Field: "used", Type:
constants.TypeNumber},
+ {Field: "free", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"total", "used", "free"},
+ Units: []string{"total=B->MB",
"used=B->MB", "free=B->MB"},
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"1048576", "524288",
"524288"}}, // 1MB, 0.5MB, 0.5MB in bytes
+ },
+ },
+ expectedValues: [][]string{
+ {"1", "0.5", "0.5"}, // Converted to MB
+ },
+ },
+ {
+ name: "mixed unit conversion",
+ metrics: &jobtypes.Metrics{
+ Name: "disk",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "size", Type:
constants.TypeNumber},
+ {Field: "usage", Type:
constants.TypeString},
+ },
+ AliasFields: []string{"size", "usage"},
+ Units: []string{"size=KB->GB"},
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"1073741824",
"50%"}}, // 1GB in KB, 50%
+ },
+ },
+ expectedValues: [][]string{
+ {"1024", "50%"}, // size converted to GB, usage
unchanged
+ },
+ },
+ {
+ name: "with unit in value string",
+ metrics: &jobtypes.Metrics{
+ Name: "memory",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "committed", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"committed"},
+ Units: []string{"committed=B->MB"},
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"2097152B"}}, // 2MB
in bytes with unit
+ },
+ },
+ expectedValues: [][]string{
+ {"2"}, // Converted to MB
+ },
+ },
+ {
+ name: "no unit conversion",
+ metrics: &jobtypes.Metrics{
+ Name: "cpu",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "usage", Type:
constants.TypeNumber},
+ {Field: "cores", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"usage", "cores"},
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"75.5", "4"}},
+ },
+ },
+ expectedValues: [][]string{
+ {"75.5", "4"}, // No conversion
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Execute CalculateFields
+ mc.CalculateFields(tt.metrics, tt.collectResult)
+
+ // Verify results
+ if len(tt.collectResult.Values) !=
len(tt.expectedValues) {
+ t.Errorf("CalculateFields() resulted in %d
rows, expected %d",
+ len(tt.collectResult.Values),
len(tt.expectedValues))
+ return
+ }
+
+ for i, row := range tt.collectResult.Values {
+ expectedRow := tt.expectedValues[i]
+ if len(row.Columns) != len(expectedRow) {
+ t.Errorf("Row %d has %d columns,
expected %d",
+ i, len(row.Columns),
len(expectedRow))
+ continue
+ }
+
+ for j, value := range row.Columns {
+ expected := expectedRow[j]
+ if value != expected {
+ t.Errorf("Row %d, Column %d =
%v, expected %v",
+ i, j, value, expected)
+ }
+ }
+ }
+ })
+ }
+}
+
+func TestCalculateFieldsWithCalculates(t *testing.T) {
+ // Create a logger for testing
+ log := logger.DefaultLogger(nil, loggertype.LogLevelInfo)
+
+ // Create metrics collector
+ mc := NewMetricsCollector(log)
+
+ tests := []struct {
+ name string
+ metrics *jobtypes.Metrics
+ collectResult *jobtypes.CollectRepMetricsData
+ expectedValues [][]string
+ }{
+ {
+ name: "calculates with string array - simple field
mapping",
+ metrics: &jobtypes.Metrics{
+ Name: "memory",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "total", Type:
constants.TypeNumber},
+ {Field: "used", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"total_bytes",
"used_bytes"},
+ Calculates: []string{"total=total_bytes",
"used=used_bytes"},
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"1024", "512"}},
+ },
+ },
+ expectedValues: [][]string{
+ {"1024", "512"},
+ },
+ },
+ {
+ name: "calculates with string array - expression",
+ metrics: &jobtypes.Metrics{
+ Name: "memory",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "total", Type:
constants.TypeNumber},
+ {Field: "used", Type:
constants.TypeNumber},
+ {Field: "usage", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"total", "used"},
+ Calculates: []string{"usage=(used / total) *
100"},
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"1024", "512"}},
+ },
+ },
+ expectedValues: [][]string{
+ {"1024", "512", "50"},
+ },
+ },
+ {
+ name: "calculates with string array - mixed mapping and
expression",
+ metrics: &jobtypes.Metrics{
+ Name: "disk",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "total", Type:
constants.TypeNumber},
+ {Field: "used", Type:
constants.TypeNumber},
+ {Field: "free", Type:
constants.TypeNumber},
+ {Field: "usage_percent", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"total_space",
"used_space", "free_space", "usage"},
+ Calculates: []string{
+ "total=total_space",
+ "used=used_space",
+ "free=free_space",
+ "usage_percent=usage",
+ },
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"2048", "1024",
"1024", "50"}},
+ },
+ },
+ expectedValues: [][]string{
+ {"2048", "1024", "1024", "50"},
+ },
+ },
+ {
+ name: "calculates with Calculate struct array",
+ metrics: &jobtypes.Metrics{
+ Name: "memory",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "total", Type:
constants.TypeNumber},
+ {Field: "used", Type:
constants.TypeNumber},
+ {Field: "free", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"total_mb", "used_mb",
"free_mb"},
+ Calculates: []jobtypes.Calculate{
+ {Field: "total", AliasField:
"total_mb"},
+ {Field: "used", AliasField: "used_mb"},
+ {Field: "free", AliasField: "free_mb"},
+ },
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"4096", "2048",
"2048"}},
+ },
+ },
+ expectedValues: [][]string{
+ {"4096", "2048", "2048"},
+ },
+ },
+ {
+ name: "calculates with Calculate struct array - complex
expressions",
+ metrics: &jobtypes.Metrics{
+ Name: "jvm",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "heap_max", Type:
constants.TypeNumber},
+ {Field: "heap_used", Type:
constants.TypeNumber},
+ {Field: "heap_free", Type:
constants.TypeNumber},
+ {Field: "heap_usage", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"max", "used", "free",
"usage"},
+ Calculates: []jobtypes.Calculate{
+ {Field: "heap_max", AliasField: "max"},
+ {Field: "heap_used", AliasField:
"used"},
+ {Field: "heap_free", AliasField:
"free"},
+ {Field: "heap_usage", AliasField:
"usage"},
+ },
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"8192", "6144",
"2048", "75"}},
+ },
+ },
+ expectedValues: [][]string{
+ {"8192", "6144", "2048", "75"},
+ },
+ },
+ {
+ name: "calculates with interface array (string format)",
+ metrics: &jobtypes.Metrics{
+ Name: "cpu",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "idle", Type:
constants.TypeNumber},
+ {Field: "usage", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"idle_percent",
"usage_percent"},
+ Calculates: []interface{}{"idle=idle_percent",
"usage=usage_percent"},
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"25", "75"}},
+ },
+ },
+ expectedValues: [][]string{
+ {"25", "75"},
+ },
+ },
+ {
+ name: "calculates with interface array (Calculate
struct format)",
+ metrics: &jobtypes.Metrics{
+ Name: "network",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "rx_bytes", Type:
constants.TypeNumber},
+ {Field: "tx_bytes", Type:
constants.TypeNumber},
+ {Field: "total_bytes", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"received",
"transmitted", "total"},
+ Calculates: []interface{}{
+ map[string]interface{}{"field":
"rx_bytes", "aliasField": "received"},
+ map[string]interface{}{"field":
"tx_bytes", "aliasField": "transmitted"},
+ map[string]interface{}{"field":
"total_bytes", "aliasField": "total"},
+ },
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"1024", "2048",
"3072"}},
+ },
+ },
+ expectedValues: [][]string{
+ {"1024", "2048", "3072"},
+ },
+ },
+ {
+ name: "calculates with interface array (mixed string
and map)",
+ metrics: &jobtypes.Metrics{
+ Name: "system",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "load1", Type:
constants.TypeNumber},
+ {Field: "load5", Type:
constants.TypeNumber},
+ {Field: "load15", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"load_1min", "load_5min",
"load_15min"},
+ Calculates: []interface{}{
+ "load1=load_1min",
+ map[string]interface{}{"field":
"load5", "aliasField": "load_5min"},
+ "load15=load_15min",
+ },
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"1.5", "2.3",
"3.1"}},
+ },
+ },
+ expectedValues: [][]string{
+ {"1.5", "2.3", "3.1"},
+ },
+ },
+ {
+ name: "calculates with interface array (expression in
string)",
+ metrics: &jobtypes.Metrics{
+ Name: "database",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "connections", Type:
constants.TypeNumber},
+ {Field: "max_connections", Type:
constants.TypeNumber},
+ {Field: "usage_rate", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"active", "max"},
+ Calculates: []interface{}{
+ "connections=active",
+ "max_connections=max",
+ "usage_rate=(connections /
max_connections) * 100",
+ },
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"75", "100"}},
+ },
+ },
+ expectedValues: [][]string{
+ {"75", "100", "75"},
+ },
+ },
+ {
+ name: "calculates with interface array (script in map)",
+ metrics: &jobtypes.Metrics{
+ Name: "storage",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "total", Type:
constants.TypeNumber},
+ {Field: "used", Type:
constants.TypeNumber},
+ {Field: "available", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"capacity", "occupied",
"free_space"},
+ Calculates: []interface{}{
+ map[string]interface{}{"field":
"total", "aliasField": "capacity"},
+ map[string]interface{}{"field": "used",
"aliasField": "occupied"},
+ map[string]interface{}{"field":
"available", "aliasField": "free_space"},
+ },
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"10240", "7168",
"3072"}},
+ },
+ },
+ expectedValues: [][]string{
+ {"10240", "7168", "3072"},
+ },
+ },
+ {
+ name: "no calculates configuration",
+ metrics: &jobtypes.Metrics{
+ Name: "simple",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "value", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"value"},
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"100"}},
+ },
+ },
+ expectedValues: [][]string{
+ {"100"},
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Execute CalculateFields
+ mc.CalculateFields(tt.metrics, tt.collectResult)
+
+ // Verify results
+ if len(tt.collectResult.Values) !=
len(tt.expectedValues) {
+ t.Errorf("CalculateFields() resulted in %d
rows, expected %d",
+ len(tt.collectResult.Values),
len(tt.expectedValues))
+ return
+ }
+
+ for i, row := range tt.collectResult.Values {
+ expectedRow := tt.expectedValues[i]
+ if len(row.Columns) != len(expectedRow) {
+ t.Errorf("Row %d has %d columns,
expected %d",
+ i, len(row.Columns),
len(expectedRow))
+ continue
+ }
+
+ for j, value := range row.Columns {
+ expected := expectedRow[j]
+ if value != expected {
+ t.Errorf("Row %d, Column %d =
%v, expected %v",
+ i, j, value, expected)
+ }
+ }
+ }
+ })
+ }
+}
+
+func TestParseUnits(t *testing.T) {
+ // Create a logger for testing
+ log := logger.DefaultLogger(nil, loggertype.LogLevelInfo)
+
+ // Create metrics collector
+ mc := NewMetricsCollector(log)
+
+ tests := []struct {
+ name string
+ units interface{}
+ expectedCount int
+ expectedField string
+ expectedFrom string
+ expectedTo string
+ }{
+ {
+ name: "string array format",
+ units: []string{"committed=B->MB",
"max=KB->GB"},
+ expectedCount: 2,
+ expectedField: "committed",
+ expectedFrom: "B",
+ expectedTo: "MB",
+ },
+ {
+ name: "interface array format with strings",
+ units: []interface{}{"memory=B->MB"},
+ expectedCount: 1,
+ expectedField: "memory",
+ expectedFrom: "B",
+ expectedTo: "MB",
+ },
+ {
+ name: "Unit struct array format",
+ units: []jobtypes.Unit{
+ {Field: "total", Unit: "B->MB"},
+ {Field: "used", Unit: "KB->GB"},
+ },
+ expectedCount: 2,
+ expectedField: "total",
+ expectedFrom: "B",
+ expectedTo: "MB",
+ },
+ {
+ name: "interface array format with Unit maps",
+ units: []interface{}{
+ map[string]interface{}{"field": "heap", "unit":
"B->MB"},
+ map[string]interface{}{"field": "stack",
"unit": "KB->MB"},
+ },
+ expectedCount: 2,
+ expectedField: "heap",
+ expectedFrom: "B",
+ expectedTo: "MB",
+ },
+ {
+ name: "mixed interface array (strings and maps)",
+ units: []interface{}{
+ "total=B->GB",
+ map[string]interface{}{"field": "used", "unit":
"KB->MB"},
+ "free=MB->GB",
+ },
+ expectedCount: 3,
+ expectedField: "used",
+ expectedFrom: "KB",
+ expectedTo: "MB",
+ },
+ {
+ name: "nil units",
+ units: nil,
+ expectedCount: 0,
+ },
+ {
+ name: "empty array",
+ units: []string{},
+ expectedCount: 0,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := mc.parseUnits(tt.units)
+
+ if len(result) != tt.expectedCount {
+ t.Errorf("parseUnits() resulted in %d
conversions, expected %d",
+ len(result), tt.expectedCount)
+ return
+ }
+
+ if tt.expectedCount > 0 && tt.expectedField != "" {
+ conversion, exists := result[tt.expectedField]
+ if !exists {
+ t.Errorf("Expected field %s not found
in result", tt.expectedField)
+ return
+ }
+
+ if conversion.OriginUnit != tt.expectedFrom {
+ t.Errorf("OriginUnit = %v, expected
%v", conversion.OriginUnit, tt.expectedFrom)
+ }
+
+ if conversion.NewUnit != tt.expectedTo {
+ t.Errorf("NewUnit = %v, expected %v",
conversion.NewUnit, tt.expectedTo)
+ }
+ }
+ })
+ }
+}
+
+func TestCalculateFieldsWithUnitStructFormat(t *testing.T) {
+ // Create a logger for testing
+ log := logger.DefaultLogger(nil, loggertype.LogLevelInfo)
+
+ // Create metrics collector
+ mc := NewMetricsCollector(log)
+
+ tests := []struct {
+ name string
+ metrics *jobtypes.Metrics
+ collectResult *jobtypes.CollectRepMetricsData
+ expectedValues [][]string
+ }{
+ {
+ name: "Unit struct array format",
+ metrics: &jobtypes.Metrics{
+ Name: "memory",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "total", Type:
constants.TypeNumber},
+ {Field: "used", Type:
constants.TypeNumber},
+ {Field: "free", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"total", "used", "free"},
+ Units: []jobtypes.Unit{
+ {Field: "total", Unit: "B->MB"},
+ {Field: "used", Unit: "B->MB"},
+ {Field: "free", Unit: "B->MB"},
+ },
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"2097152",
"1048576", "1048576"}}, // 2MB, 1MB, 1MB in bytes
+ },
+ },
+ expectedValues: [][]string{
+ {"2", "1", "1"}, // Converted to MB
+ },
+ },
+ {
+ name: "mixed interface array with Unit maps and
strings",
+ metrics: &jobtypes.Metrics{
+ Name: "storage",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "disk_total", Type:
constants.TypeNumber},
+ {Field: "disk_used", Type:
constants.TypeNumber},
+ {Field: "disk_free", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"disk_total",
"disk_used", "disk_free"},
+ Units: []interface{}{
+ map[string]interface{}{"field":
"disk_total", "unit": "KB->GB"},
+ "disk_used=KB->GB",
+ map[string]interface{}{"field":
"disk_free", "unit": "KB->GB"},
+ },
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"1048576", "524288",
"524288"}}, // 1GB, 0.5GB, 0.5GB in KB
+ },
+ },
+ expectedValues: [][]string{
+ {"1", "0.5", "0.5"}, // Converted to GB
+ },
+ },
+ {
+ name: "Unit struct with calculates",
+ metrics: &jobtypes.Metrics{
+ Name: "jvm",
+ Priority: 0,
+ Fields: []jobtypes.Field{
+ {Field: "heap_max", Type:
constants.TypeNumber},
+ {Field: "heap_used", Type:
constants.TypeNumber},
+ {Field: "heap_usage", Type:
constants.TypeNumber},
+ },
+ AliasFields: []string{"max_bytes",
"used_bytes"},
+ Calculates: []string{"heap_max=max_bytes",
"heap_used=used_bytes", "heap_usage=(heap_used / heap_max) * 100"},
+ Units: []jobtypes.Unit{
+ {Field: "heap_max", Unit: "B->MB"},
+ {Field: "heap_used", Unit: "B->MB"},
+ },
+ },
+ collectResult: &jobtypes.CollectRepMetricsData{
+ Code: constants.CollectSuccess,
+ Values: []jobtypes.ValueRow{
+ {Columns: []string{"4194304",
"3145728"}}, // 4MB, 3MB in bytes
+ },
+ },
+ expectedValues: [][]string{
+ {"4", "3", "75"}, // Converted to MB, usage
calculated
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Execute CalculateFields
+ mc.CalculateFields(tt.metrics, tt.collectResult)
+
+ // Verify results
+ if len(tt.collectResult.Values) !=
len(tt.expectedValues) {
+ t.Errorf("CalculateFields() resulted in %d
rows, expected %d",
+ len(tt.collectResult.Values),
len(tt.expectedValues))
+ return
+ }
+
+ for i, row := range tt.collectResult.Values {
+ expectedRow := tt.expectedValues[i]
+ if len(row.Columns) != len(expectedRow) {
+ t.Errorf("Row %d has %d columns,
expected %d",
+ i, len(row.Columns),
len(expectedRow))
+ continue
+ }
+
+ for j, value := range row.Columns {
+ expected := expectedRow[j]
+ if value != expected {
+ t.Errorf("Row %d, Column %d =
%v, expected %v",
+ i, j, value, expected)
+ }
+ }
+ }
+ })
+ }
+}
diff --git a/internal/collector/common/collect/lazy_message_router.go
b/internal/collector/common/collect/lazy_message_router.go
index 3ad76e1..a038099 100644
--- a/internal/collector/common/collect/lazy_message_router.go
+++ b/internal/collector/common/collect/lazy_message_router.go
@@ -186,12 +186,6 @@ func (l *LazyMessageRouter) createArrowRecordBatch(mem
memory.Allocator, data *j
// Add dynamic fields (based on collected field definitions)
dynamicFields := make([]arrow.Field, 0, len(data.Fields))
for _, field := range data.Fields {
- // Ensure unit is not empty
- unitValue := field.Unit
- if unitValue == "" {
- unitValue = "none"
- }
-
// Create field metadata
typeValue := fmt.Sprintf("%d", field.Type)
labelValue := fmt.Sprintf("%t", field.Label)
@@ -199,7 +193,7 @@ func (l *LazyMessageRouter) createArrowRecordBatch(mem
memory.Allocator, data *j
fieldMetadata := arrow.MetadataFrom(map[string]string{
"type": typeValue,
"label": labelValue,
- "unit": unitValue,
+ "unit": field.Unit,
})
dynamicFields = append(dynamicFields, arrow.Field{
diff --git a/internal/constants/const.go b/internal/constants/const.go
index 1376097..6895fcb 100644
--- a/internal/constants/const.go
+++ b/internal/constants/const.go
@@ -19,7 +19,15 @@ package constants
// System constants
const (
- KeyWord = "keyword"
+ KeyWord = "keyword"
+ NullValue = " "
+)
+
+// Field type constants
+const (
+ TypeNumber = 0
+ TypeString = 1
+ TypeTime = 2
)
// Service related constants
diff --git a/internal/util/param/param_replacer.go
b/internal/util/param/param_replacer.go
index 1b23781..4346e05 100644
--- a/internal/util/param/param_replacer.go
+++ b/internal/util/param/param_replacer.go
@@ -395,3 +395,58 @@ func (r *Replacer) decryptPassword(encryptedPassword
string) (string, error) {
log.Info("all decryption attempts failed, using original value")
return encryptedPassword, nil
}
+
+// DoubleAndUnit represents a numeric value with its unit
+type DoubleAndUnit struct {
+ Value float64
+ Unit string
+}
+
+// Common unit symbols to check for
+// Order matters: longer strings should come before shorter ones to avoid
partial matches
+// e.g., "Ki" before "K", "Mi" before "M", "Gi" before "G"
+var unitSymbols = []string{
+ "%", "Gi", "Mi", "Ki", "G", "g", "M", "m", "K", "k", "B", "b",
+}
+
+// ExtractDoubleAndUnitFromStr extracts a double value and unit from a string
+// Examples:
+// - "123.45" -> {Value: 123.45, Unit: ""}
+// - "23.43GB" -> {Value: 23.43, Unit: "GB"}
+// - "33KB" -> {Value: 33, Unit: "KB"}
+// - "99%" -> {Value: 99, Unit: "%"}
+// - "MB" -> {Value: 0, Unit: "MB"}
+func (r *Replacer) ExtractDoubleAndUnitFromStr(str string) *DoubleAndUnit {
+ if str == "" {
+ return nil
+ }
+
+ str = strings.TrimSpace(str)
+ doubleAndUnit := &DoubleAndUnit{}
+
+ if value, err := strconv.ParseFloat(str, 64); err == nil {
+ doubleAndUnit.Value = value
+ return doubleAndUnit
+ }
+
+ for _, unitSymbol := range unitSymbols {
+ index := strings.Index(str, unitSymbol)
+
+ if index == 0 {
+ doubleAndUnit.Value = 0
+ doubleAndUnit.Unit = strings.TrimSpace(str)
+ return doubleAndUnit
+ }
+
+ if index > 0 {
+ numericPart := str[:index]
+ if value, err :=
strconv.ParseFloat(strings.TrimSpace(numericPart), 64); err == nil {
+ doubleAndUnit.Value = value
+ doubleAndUnit.Unit =
strings.TrimSpace(str[index:])
+ return doubleAndUnit
+ }
+ }
+ }
+
+ return nil
+}
diff --git a/internal/util/unit/converter.go b/internal/util/unit/converter.go
new file mode 100644
index 0000000..e56b325
--- /dev/null
+++ b/internal/util/unit/converter.go
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package unit
+
+import (
+ "fmt"
+ "strings"
+)
+
+// Conversion represents a unit conversion configuration
+type Conversion struct {
+ Field string
+ OriginUnit string
+ NewUnit string
+}
+
+// ParseConversion parses a unit conversion string
+// Format: "fieldName=originUnit->newUnit"
+// Example: "committed=B->MB"
+func ParseConversion(unitStr string) (*Conversion, error) {
+ // Split by '='
+ parts := strings.Split(unitStr, "=")
+ if len(parts) != 2 {
+ return nil, fmt.Errorf("invalid unit conversion format: %s,
expected 'fieldName=originUnit->newUnit'", unitStr)
+ }
+
+ fieldName := strings.TrimSpace(parts[0])
+ conversionPart := strings.TrimSpace(parts[1])
+
+ // Split by '->'
+ unitParts := strings.Split(conversionPart, "->")
+ if len(unitParts) != 2 {
+ return nil, fmt.Errorf("invalid unit conversion format: %s,
expected 'originUnit->newUnit'", conversionPart)
+ }
+
+ return &Conversion{
+ Field: fieldName,
+ OriginUnit: strings.TrimSpace(unitParts[0]),
+ NewUnit: strings.TrimSpace(unitParts[1]),
+ }, nil
+}
+
+// Convert converts a value from origin unit to new unit
+// Supports common unit conversions for bytes, bits, and time
+func Convert(value float64, originUnit, newUnit string) (float64, error) {
+ // Normalize unit names to uppercase for comparison
+ originUnit = strings.ToUpper(originUnit)
+ newUnit = strings.ToUpper(newUnit)
+
+ // If units are the same, no conversion needed
+ if originUnit == newUnit {
+ return value, nil
+ }
+
+ // Define unit conversion factors (all relative to base unit)
+ // For bytes: base unit is B (byte)
+ byteUnits := map[string]float64{
+ "B": 1,
+ "KB": 1024,
+ "MB": 1024 * 1024,
+ "GB": 1024 * 1024 * 1024,
+ "TB": 1024 * 1024 * 1024 * 1024,
+ "PB": 1024 * 1024 * 1024 * 1024 * 1024,
+ // Binary versions
+ "KIB": 1024,
+ "MIB": 1024 * 1024,
+ "GIB": 1024 * 1024 * 1024,
+ "TIB": 1024 * 1024 * 1024 * 1024,
+ "PIB": 1024 * 1024 * 1024 * 1024 * 1024,
+ }
+
+ // For bits: base unit is b (bit)
+ bitUnits := map[string]float64{
+ "B": 1,
+ "KB": 1000,
+ "MB": 1000 * 1000,
+ "GB": 1000 * 1000 * 1000,
+ "TB": 1000 * 1000 * 1000 * 1000,
+ "PB": 1000 * 1000 * 1000 * 1000 * 1000,
+ "KBIT": 1000,
+ "MBIT": 1000 * 1000,
+ "GBIT": 1000 * 1000 * 1000,
+ "TBIT": 1000 * 1000 * 1000 * 1000,
+ "PBIT": 1000 * 1000 * 1000 * 1000 * 1000,
+ }
+
+ // For time: base unit is ns (nanosecond)
+ timeUnits := map[string]float64{
+ "NS": 1,
+ "US": 1000,
+ "MS": 1000_000,
+ "S": 1000_000_000,
+ "MIN": 60_000_000_000,
+ "H": 3600_000_000_000,
+ "D": 86_400_000_000_000,
+ }
+
+ // Try byte conversion first
+ if originFactor, originOk := byteUnits[originUnit]; originOk {
+ if newFactor, newOk := byteUnits[newUnit]; newOk {
+ return value * originFactor / newFactor, nil
+ }
+ }
+
+ // Try bit conversion
+ if originFactor, originOk := bitUnits[originUnit]; originOk {
+ if newFactor, newOk := bitUnits[newUnit]; newOk {
+ return value * originFactor / newFactor, nil
+ }
+ }
+
+ // Try time conversion
+ if originFactor, originOk := timeUnits[originUnit]; originOk {
+ if newFactor, newOk := timeUnits[newUnit]; newOk {
+ return value * originFactor / newFactor, nil
+ }
+ }
+
+ return 0, fmt.Errorf("unsupported unit conversion from %s to %s",
originUnit, newUnit)
+}
diff --git a/internal/util/unit/converter_test.go
b/internal/util/unit/converter_test.go
new file mode 100644
index 0000000..6f2be99
--- /dev/null
+++ b/internal/util/unit/converter_test.go
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package unit
+
+import (
+ "testing"
+)
+
+func TestParseConversion(t *testing.T) {
+ tests := []struct {
+ name string
+ input string
+ wantField string
+ wantOrigin string
+ wantNew string
+ wantErr bool
+ }{
+ {
+ name: "valid byte conversion",
+ input: "committed=B->MB",
+ wantField: "committed",
+ wantOrigin: "B",
+ wantNew: "MB",
+ wantErr: false,
+ },
+ {
+ name: "valid with spaces",
+ input: "memory = KB -> GB",
+ wantField: "memory",
+ wantOrigin: "KB",
+ wantNew: "GB",
+ wantErr: false,
+ },
+ {
+ name: "time conversion",
+ input: "duration=ms->s",
+ wantField: "duration",
+ wantOrigin: "ms",
+ wantNew: "s",
+ wantErr: false,
+ },
+ {
+ name: "missing arrow",
+ input: "committed=B",
+ wantErr: true,
+ },
+ {
+ name: "missing equals",
+ input: "committed",
+ wantErr: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := ParseConversion(tt.input)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("ParseConversion() error = %v, wantErr
%v", err, tt.wantErr)
+ return
+ }
+ if !tt.wantErr {
+ if got.Field != tt.wantField {
+ t.Errorf("ParseConversion() Field = %v,
want %v", got.Field, tt.wantField)
+ }
+ if got.OriginUnit != tt.wantOrigin {
+ t.Errorf("ParseConversion() OriginUnit
= %v, want %v", got.OriginUnit, tt.wantOrigin)
+ }
+ if got.NewUnit != tt.wantNew {
+ t.Errorf("ParseConversion() NewUnit =
%v, want %v", got.NewUnit, tt.wantNew)
+ }
+ }
+ })
+ }
+}
+
+func TestConvert(t *testing.T) {
+ tests := []struct {
+ name string
+ value float64
+ originUnit string
+ newUnit string
+ want float64
+ wantErr bool
+ }{
+ {
+ name: "B to MB",
+ value: 1048576,
+ originUnit: "B",
+ newUnit: "MB",
+ want: 1,
+ wantErr: false,
+ },
+ {
+ name: "KB to MB",
+ value: 1024,
+ originUnit: "KB",
+ newUnit: "MB",
+ want: 1,
+ wantErr: false,
+ },
+ {
+ name: "MB to GB",
+ value: 1024,
+ originUnit: "MB",
+ newUnit: "GB",
+ want: 1,
+ wantErr: false,
+ },
+ {
+ name: "GB to MB",
+ value: 2,
+ originUnit: "GB",
+ newUnit: "MB",
+ want: 2048,
+ wantErr: false,
+ },
+ {
+ name: "same unit",
+ value: 100,
+ originUnit: "MB",
+ newUnit: "MB",
+ want: 100,
+ wantErr: false,
+ },
+ {
+ name: "case insensitive",
+ value: 1024,
+ originUnit: "kb",
+ newUnit: "mb",
+ want: 1,
+ wantErr: false,
+ },
+ {
+ name: "ms to s",
+ value: 1000,
+ originUnit: "ms",
+ newUnit: "s",
+ want: 1,
+ wantErr: false,
+ },
+ {
+ name: "s to ms",
+ value: 1,
+ originUnit: "s",
+ newUnit: "ms",
+ want: 1000,
+ wantErr: false,
+ },
+ {
+ name: "unsupported conversion",
+ value: 100,
+ originUnit: "MB",
+ newUnit: "UNKNOWN",
+ wantErr: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := Convert(tt.value, tt.originUnit, tt.newUnit)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("Convert() error = %v, wantErr %v",
err, tt.wantErr)
+ return
+ }
+ if !tt.wantErr && got != tt.want {
+ t.Errorf("Convert() = %v, want %v", got,
tt.want)
+ }
+ })
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]