This is an automated email from the ASF dual-hosted git repository. shown pushed a commit to branch 0905-yuluo/refactor in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit f258f68beda6e01c7ceb85f45cf3444bfdcbb766 Author: yuluo-yx <[email protected]> AuthorDate: Fri Sep 5 23:34:46 2025 +0800 refactor: optimize server Signed-off-by: yuluo-yx <[email protected]> --- cmd/collector/hertzbeat-collector.yaml | 25 ---- cmd/collector/main.go | 18 +-- cmd/collector/root/root.go | 21 +++ docker-compose.yml | 1 + docs/SCHEDULING_ARCHITECTURE.md | 4 +- examples/main_simulation.go | 26 ++-- go.mod | 3 + go.sum | 8 ++ {pkg => internal}/banner/banner.txt | 2 +- {pkg => internal}/banner/embed.go | 24 +--- internal/cmd/server.go | 144 +++++++++++++++++++++ internal/cmd/version.go | 25 ++++ internal/cmd/version/version.go | 55 ++++++++ .../collector/basic/abstract_collect.go | 16 +-- .../collector/basic/database/jdbc_auto_register.go | 6 +- .../collector/basic/database/jdbc_collector.go | 6 +- .../basic/database/jdbc_collector_test.go | 6 +- .../basic => internal/collector/basic/dns}/.keep | 0 .../collector/basic/ftp}/.keep | 0 .../dns => internal/collector/basic/http}/.keep | 0 .../ftp => internal/collector/basic/icmp}/.keep | 0 .../http => internal/collector/basic/imap}/.keep | 0 .../icmp => internal/collector/basic/ipmi2}/.keep | 0 .../imap => internal/collector/basic/jmx}/.keep | 0 .../collector/basic/memcached}/.keep | 0 .../jmx => internal/collector/basic/modbus}/.keep | 0 .../collector/basic/mqtt}/.keep | 0 .../collector/basic/nginx}/.keep | 0 .../mqtt => internal/collector/basic/ntp}/.keep | 0 .../nginx => internal/collector/basic/plc}/.keep | 0 .../ntp => internal/collector/basic/pop3}/.keep | 0 .../collector/basic/prometheus}/.keep | 0 .../pop3 => internal/collector/basic/push}/.keep | 0 .../collector/basic/redfish}/.keep | 0 .../push => internal/collector/basic/redis}/.keep | 0 .../collector/basic/registry}/.keep | 0 .../redis => internal/collector/basic/s7}/.keep | 0 .../collector/basic/script}/.keep | 0 .../basic/s7 => internal/collector/basic/sd}/.keep | 0 .../script => internal/collector/basic/smtp}/.keep | 0 .../sd => internal/collector/basic/snmp}/.keep | 0 .../smtp => internal/collector/basic/ssh}/.keep | 0 .../snmp => internal/collector/basic/telnet}/.keep | 0 .../ssh => internal/collector/basic/udp}/.keep | 0 .../collector/basic/websocket}/.keep | 0 {pkg => internal}/collector/collect_service.go | 34 ++--- .../udp => internal/collector/common/cache}/.keep | 0 .../collector/common/collect/dispatch}/.keep | 0 .../collect/metrics/hertzbeat_metrics_collector.go | 0 .../common/collect/strategy/strategy_factory.go | 0 .../collector/common/dispatcher/entrance/client.go | 2 +- .../dispatcher/entrance/collect_job_service.go | 48 +++---- .../entrance/collect_job_service_test.go | 8 +- .../common/dispatcher/entrance/collect_server.go | 10 +- .../collector/common/dispatcher/entrance/config.go | 0 .../dispatcher/entrance/mysql_scheduling_test.go | 40 +++--- .../common/dispatcher/entrance/processors.go | 4 +- .../collector/common/dispatcher/exporter}/.keep | 0 .../collector/common/job/job_server.go | 2 +- .../collector/common/ssh}/.keep | 0 .../collector/common/timer/timer_dispatcher.go | 32 ++--- .../common/timer/timer_dispatcher_test.go | 4 +- .../collector/common/timer/timer_wheel.go | 4 +- .../collector/common/timer/timer_wheel_test.go | 4 +- .../collector/common/timer/wheel_timer_task.go | 24 ++-- .../collector/common/transport/transport.go | 2 +- .../collector/common/types/config}/config_types.go | 2 +- internal/collector/common/types/err/error_types.go | 9 ++ .../collector/common}/types/job/job_types.go | 0 .../collector/common}/types/job/metrics_types.go | 0 .../types/job/protocol/common_request_protocol.go | 0 .../types/job/protocol/consul_sd_protocol.go | 4 +- .../common}/types/job/protocol/ssh_protocol.go | 0 .../types/job/protocol/zookeeper_sd_protocol.go | 2 +- .../collector/common}/types/job/timeout_types.go | 0 .../common/types/logger}/logging_types.go | 14 +- {pkg => internal}/collector/config/config.go | 13 +- {pkg => internal}/collector/config/config_test.go | 2 +- .../collector/extension/kafka}/.keep | 0 .../collector/extension/mongodb}/.keep | 0 .../collector/extension/nebulagraph}/.keep | 0 .../collector/extension/rocketmq}/.keep | 0 {pkg => internal}/collector/registry.go | 6 +- .../collector/registry/registry_center.go | 4 +- internal/collector/server/server.go | 44 +++++++ {pkg => internal}/collector/server/server_test.go | 0 .../collector/worker/metrics_collect.go | 24 ++-- .../worker/metrics_collect_integration_test.go | 8 +- {pkg => internal}/collector/worker/task_queue.go | 2 +- .../collector/worker/task_queue_test.go | 2 +- {pkg => internal}/collector/worker/worker_pool.go | 2 +- .../collector/worker/worker_pool_test.go | 2 +- {pkg => internal}/constants/const.go | 0 {pkg => internal/util}/logger/logger.go | 39 +++--- {pkg => internal/util}/logger/logger_test.go | 23 ++-- internal/util/timer/timer.go | 8 ++ pkg/collector/bootstrap.go | 133 ------------------- pkg/collector/extension/nebulagraph/.keep | 0 pkg/collector/extension/rocketmq/.keep | 0 pkg/collector/server/server.go | 86 ------------ pkg/util/.keep | 0 tools/ci-config/labeler.yml | 2 +- tools/make/golang.mk | 9 +- 103 files changed, 546 insertions(+), 502 deletions(-) diff --git a/cmd/collector/hertzbeat-collector.yaml b/cmd/collector/hertzbeat-collector.yaml deleted file mode 100644 index 3cae1f6..0000000 --- a/cmd/collector/hertzbeat-collector.yaml +++ /dev/null @@ -1,25 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -collector: - info: - name: hertzbeat-collector-go - ip: 127.0.0.1 - port: 8080 - - log: - level: debug diff --git a/cmd/collector/main.go b/cmd/collector/main.go index cea5661..ef624cb 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -18,26 +18,16 @@ package main import ( - "flag" + "fmt" "os" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector" + "hertzbeat.apache.org/hertzbeat-collector-go/cmd/collector/root" ) -// go build -ldflags "-X main.Version=x.y.z" -var ( - ConfPath string - Version string -) - -func init() { - flag.StringVar(&ConfPath, "conf", "hertzbeat-collector.yaml", "path to config file, eg: -conf ./hertzbeat-collector.yaml") -} - func main() { - if err := collector.Bootstrap(ConfPath, Version); err != nil { - + if err := root.GetRootCommand().Execute(); err != nil { + _, _ = fmt.Fprintln(os.Stderr, err) os.Exit(1) } } diff --git a/cmd/collector/root/root.go b/cmd/collector/root/root.go new file mode 100644 index 0000000..4893e5e --- /dev/null +++ b/cmd/collector/root/root.go @@ -0,0 +1,21 @@ +package root + +import ( + "hertzbeat.apache.org/hertzbeat-collector-go/internal/cmd" + + "github.com/spf13/cobra" +) + +func GetRootCommand() *cobra.Command { + + c := &cobra.Command{ + Use: "hertzbeat-collector-go", + Short: "HertzBeat Collector Go", + Long: "Apache Hertzbeat Collector Go Impl", + } + + c.AddCommand(cmd.VersionCommand()) + c.AddCommand(cmd.ServerCommand()) + + return c +} diff --git a/docker-compose.yml b/docker-compose.yml index e9ead6c..2a4eea7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +# todo:待补充和完善 services: # collector service hertzbeat-collector: diff --git a/docs/SCHEDULING_ARCHITECTURE.md b/docs/SCHEDULING_ARCHITECTURE.md index acaa5f9..cc9ad74 100644 --- a/docs/SCHEDULING_ARCHITECTURE.md +++ b/docs/SCHEDULING_ARCHITECTURE.md @@ -226,7 +226,7 @@ func RegisterBuiltinCollectors(service *CollectService, logger logger.Logger) { } // 新的自动注册机制 -// pkg/collector/basic/database/jdbc_auto_register.go +// internal/collector/basic/database/jdbc_auto_register.go func init() { registry.RegisterCollectorFactory( "jdbc", @@ -435,7 +435,7 @@ func init() { } // 步骤3: 在registry.go中添加导入 -_ "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic/redis" +_ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/redis" ``` ### 2. 调试和监控 diff --git a/examples/main_simulation.go b/examples/main_simulation.go index 4924876..7e97953 100644 --- a/examples/main_simulation.go +++ b/examples/main_simulation.go @@ -26,11 +26,11 @@ import ( "syscall" "time" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/dispatcher/entrance" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/dispatcher/entrance" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + logger2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // go build -ldflags "-X main.Version=x.y.z" @@ -49,7 +49,7 @@ func main() { flag.Parse() // 初始化日志 - log := logger.DefaultLogger(os.Stdout, types.LogLevelInfo) + log := logger.DefaultLogger(os.Stdout, logger2.LogLevelInfo) log.Info("🚀 启动HertzBeat Collector", "version", Version, "simulation", Simulation) Simulation = true if Simulation { @@ -108,8 +108,8 @@ func runSimulationMode(log logger.Logger) { } // createManagerJDBCTask 创建模拟从Manager接收的JDBC采集任务 -func createManagerJDBCTask() *jobtypes.Job { - return &jobtypes.Job{ +func createManagerJDBCTask() *job.Job { + return &job.Job{ ID: 1001, TenantID: 1, MonitorID: 2001, @@ -144,7 +144,7 @@ func createManagerJDBCTask() *jobtypes.Job { }, // JDBC采集指标配置 - Metrics: []jobtypes.Metrics{ + Metrics: []job.Metrics{ { Name: "mysql_basic_info", Priority: 0, @@ -153,13 +153,13 @@ func createManagerJDBCTask() *jobtypes.Job { Port: "3306", Timeout: "15s", Interval: 30, - Fields: []jobtypes.Field{ + Fields: []job.Field{ {Field: "database_name", Type: 1, Label: true}, {Field: "version", Type: 1, Label: false}, {Field: "uptime_seconds", Type: 0, Label: false}, {Field: "server_id", Type: 0, Label: false}, }, - JDBC: &jobtypes.JDBCProtocol{ + JDBC: &job.JDBCProtocol{ Host: "localhost", Port: "3306", Platform: "mysql", @@ -180,7 +180,7 @@ func createManagerJDBCTask() *jobtypes.Job { } // simulateManagerTasks 模拟Manager定期发送采集任务 -func simulateManagerTasks(ctx context.Context, collectServer *entrance.CollectServer, job *jobtypes.Job, log logger.Logger) { +func simulateManagerTasks(ctx context.Context, collectServer *entrance.CollectServer, job *job.Job, log logger.Logger) { log.Info("📡 开始模拟Manager任务调度", "jobId", job.ID, "interval", "60s") // 创建任务响应监听器 (模拟发送结果回Manager) @@ -251,7 +251,7 @@ func (mrs *ManagerResponseSimulator) Response(metricsData []interface{}) { // 模拟处理每个采集指标的结果 for i, data := range metricsData { - if collectData, ok := data.(*jobtypes.CollectRepMetricsData); ok { + if collectData, ok := data.(*job.CollectRepMetricsData); ok { mrs.logger.Info("📊 处理采集指标", "metricIndex", i+1, "metricName", collectData.Metrics, diff --git a/go.mod b/go.mod index 8240bbf..51bad35 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/lib/pq v1.10.9 github.com/microsoft/go-mssqldb v1.9.3 github.com/prometheus/client_golang v1.23.0 + github.com/spf13/cobra v1.10.1 github.com/stretchr/testify v1.10.0 go.uber.org/zap v1.27.0 google.golang.org/protobuf v1.36.6 @@ -23,12 +24,14 @@ require ( github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.65.0 // indirect github.com/prometheus/procfs v0.16.1 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect + github.com/spf13/pflag v1.0.9 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.38.0 // indirect golang.org/x/sys v0.33.0 // indirect diff --git a/go.sum b/go.sum index 7933e94..614d29a 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,7 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= @@ -34,6 +35,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -60,6 +63,11 @@ github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzM github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spf13/cobra v1.10.1 h1:lJeBwCfmrnXthfAupyUTzJ/J4Nc1RsHC/mSRU2dll/s= +github.com/spf13/cobra v1.10.1/go.mod h1:7SmJGaTHFVBY0jW4NXGluQoLvhqFQM+6XSKD+P4XaB0= +github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY= +github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= diff --git a/pkg/banner/banner.txt b/internal/banner/banner.txt similarity index 77% rename from pkg/banner/banner.txt rename to internal/banner/banner.txt index 3a3161d..51c6457 100644 --- a/pkg/banner/banner.txt +++ b/internal/banner/banner.txt @@ -1,5 +1,5 @@ _ _ _ ____ _ | | | | ___ _ __| |_ ___| __ ) ___ __ _| |_ | |_| |/ _ \ '__| __|_ / _ \ / _ \/ _` | __| - | _ | __/ | | |_ / /| |_) | __/ (_| | |_ Name: {{ .CollectorName }} Port: {{ .ServerPort }} Pid: {{ .Pid }} Version: {{ .Version }} + | _ | __/ | | |_ / /| |_) | __/ (_| | |_ Name: {{ .CollectorName }} Port: {{ .ServerPort }} Pid: {{ .Pid }} |_| |_|\___|_| \__/___|____/ \___|\__,_|\__| https://hertzbeat.apache.org/ diff --git a/pkg/banner/embed.go b/internal/banner/embed.go similarity index 68% rename from pkg/banner/embed.go rename to internal/banner/embed.go index 82b8727..0e878d6 100644 --- a/pkg/banner/embed.go +++ b/internal/banner/embed.go @@ -27,41 +27,22 @@ import ( //go:embed banner.txt var EmbedLogo embed.FS -// LoggerInterface defines the interface for logging -type LoggerInterface interface { - Error(err error, msg string) - Info(msg string, keysAndValues ...interface{}) -} - -type Banner struct { - logger LoggerInterface -} - -func New(logger LoggerInterface) *Banner { - return &Banner{logger: logger} -} - type bannerVars struct { CollectorName string ServerPort string Pid string - Version string } -func (b *Banner) PrintBanner(appName, port string) error { - return b.PrintBannerWithVersion(appName, port, "unknown") -} +func PrintBanner(appName, port string) error { -func (b *Banner) PrintBannerWithVersion(appName, port, version string) error { data, err := EmbedLogo.ReadFile("banner.txt") if err != nil { - b.logger.Error(err, "read banner file failed") + return err } tmpl, err := template.New("banner").Parse(string(data)) if err != nil { - b.logger.Error(err, "parse banner template failed") return err } @@ -69,7 +50,6 @@ func (b *Banner) PrintBannerWithVersion(appName, port, version string) error { CollectorName: appName, ServerPort: port, Pid: strconv.Itoa(os.Getpid()), - Version: version, } err = tmpl.Execute(os.Stdout, vars) diff --git a/internal/cmd/server.go b/internal/cmd/server.go new file mode 100644 index 0000000..d992a01 --- /dev/null +++ b/internal/cmd/server.go @@ -0,0 +1,144 @@ +package cmd + +import ( + "context" + "fmt" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types" + "os" + "os/signal" + "sync" + "syscall" + + "github.com/spf13/cobra" + + "collector-go/internal/collector/common/config" + "collector-go/internal/collector/common/job" + clrServer "collector-go/internal/collector/common/server" + "collector-go/internal/collector/common/transport" +) + +var ( + cfgPath string +) + +type Runner[I types.Info] interface { + Start(ctx context.Context) error + Info() I + Close() error +} + +func ServerCommand() *cobra.Command { + + cmd := &cobra.Command{ + Use: "server", + Aliases: []string{"server", "srv", "s"}, + Short: "Server Hertzbeat Collector Go", + RunE: func(cmd *cobra.Command, args []string) error { + return server(cmd.Context()) + }, + } + + cmd.Flags().StringVarP(&cfgPath, "config", "c", "", "config file path") + + return cmd +} + +func getConfigByPath() (*config.Config, error) { + + cfgServer, err := config.New(cfgPath) + if err != nil { + return nil, err + } + + cfg, err := cfgServer.Loader() + if err != nil { + return nil, err + } + + return cfg, nil +} + +func serverByCfg(cfg *config.Config) *clrServer.Server { + + return clrServer.New(cfg) +} + +func server(ctx context.Context) error { + cfg, err := getConfigByPath() + if err != nil { + return fmt.Errorf("failed to load config: %w", err) + } + + collectorServer := serverByCfg(cfg) + + // 创建一个带取消的context + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // 启动runners并等待完成或错误 + return startRunners(ctx, collectorServer) +} + +func startRunners(ctx context.Context, cfg *clrServer.Server) error { + + runners := []struct { + runner Runner[types.Info] + }{ + { + job.New(&job.Config{ + Server: *cfg, + }), + }, + { + transport.New(&transport.Config{ + Server: *cfg, + }), + }, + // metrics + } + + errCh := make(chan error, len(runners)) + + var wg sync.WaitGroup + + for _, r := range runners { + wg.Add(1) + go func(runner Runner[types.Info]) { + defer wg.Done() + fmt.Printf("Starting runner: %s\n", runner.Info().Name) + + if err := runner.Start(ctx); err != nil { + select { + case errCh <- err: + default: + } + } + }(r.runner) + } + + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) + + cleanup := func() { + signal.Stop(signalCh) + for _, r := range runners { + if err := r.runner.Close(); err != nil { + fmt.Printf("error closing runner %s: %v\n", r.runner.Info(), err) + } + } + } + + select { + case <-ctx.Done(): + fmt.Println("Context cancelled") + cleanup() + return ctx.Err() + case sig := <-signalCh: + fmt.Printf("Received signal: %v\n", sig) + cleanup() + return nil + case err := <-errCh: + cleanup() + return fmt.Errorf("runner error: %w", err) + } +} diff --git a/internal/cmd/version.go b/internal/cmd/version.go new file mode 100644 index 0000000..027548e --- /dev/null +++ b/internal/cmd/version.go @@ -0,0 +1,25 @@ +package cmd + +import ( + "collector-go/internal/cmd/version" + + "github.com/spf13/cobra" +) + +func VersionCommand() *cobra.Command { + + var output string + + cmd := &cobra.Command{ + Use: "version", + Aliases: []string{"versions", "v"}, + Short: "Show versions", + RunE: func(cmd *cobra.Command, args []string) error { + return version.Print(cmd.OutOrStdout(), output) + }, + } + + cmd.PersistentFlags().StringVarP(&output, "output", "o", "", "One of 'yaml' or 'json'") + + return cmd +} diff --git a/internal/cmd/version/version.go b/internal/cmd/version/version.go new file mode 100644 index 0000000..cdffb6c --- /dev/null +++ b/internal/cmd/version/version.go @@ -0,0 +1,55 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package version + +import ( + "encoding/json" + "fmt" + "io" + "runtime" + + "gopkg.in/yaml.v3" +) + +type Info struct { + HCGVersion string `json:"envoyGatewayVersion"` + GitCommitID string `json:"gitCommitID"` + GolangVersion string `json:"golangVersion"` +} + +func Get() Info { + return Info{ + HCGVersion: hcgVersion, + GitCommitID: gitCommitID, + GolangVersion: runtime.Version(), + } +} + +var ( + hcgVersion string + gitCommitID string +) + +// Print shows the versions of the Envoy Gateway. +func Print(w io.Writer, format string) error { + v := Get() + switch format { + case "json": + if marshalled, err := json.MarshalIndent(v, "", " "); err == nil { + _, _ = fmt.Fprintln(w, string(marshalled)) + } + case "yaml": + if marshalled, err := yaml.Marshal(v); err == nil { + _, _ = fmt.Fprintln(w, string(marshalled)) + } + default: + _, _ = fmt.Fprintf(w, "HERTZBEAT_COLECTOR_GO_VERSION: %s\n", v.HCGVersion) + _, _ = fmt.Fprintf(w, "GIT_COMMIT_ID: %s\n", v.GitCommitID) + _, _ = fmt.Fprintf(w, "GOLANG_VERSION: %s\n", v.GolangVersion) + } + + return nil +} diff --git a/pkg/collector/basic/abstract_collect.go b/internal/collector/basic/abstract_collect.go similarity index 92% rename from pkg/collector/basic/abstract_collect.go rename to internal/collector/basic/abstract_collect.go index 593b55d..c67e618 100644 --- a/pkg/collector/basic/abstract_collect.go +++ b/internal/collector/basic/abstract_collect.go @@ -18,10 +18,9 @@ package basic import ( - "time" - - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/timer" ) // AbstractCollector defines the interface for all collectors @@ -56,7 +55,7 @@ func (bc *BaseCollector) CreateSuccessResponse(metrics *jobtypes.Metrics) *jobty App: "", // Will be set by the calling context Metrics: metrics.Name, Priority: 0, - Time: getCurrentTimeMillis(), + Time: timer.GetCurrentTimeMillis(), Code: 200, // Success Msg: "success", Fields: make([]jobtypes.Field, 0), @@ -74,7 +73,7 @@ func (bc *BaseCollector) CreateFailResponse(metrics *jobtypes.Metrics, code int, App: "", // Will be set by the calling context Metrics: metrics.Name, Priority: 0, - Time: getCurrentTimeMillis(), + Time: timer.GetCurrentTimeMillis(), Code: code, Msg: message, Fields: make([]jobtypes.Field, 0), @@ -118,8 +117,3 @@ func (cr *CollectorRegistry) GetSupportedProtocols() []string { } return protocols } - -// getCurrentTimeMillis returns current time in milliseconds -func getCurrentTimeMillis() int64 { - return time.Now().UnixMilli() -} diff --git a/pkg/collector/basic/database/jdbc_auto_register.go b/internal/collector/basic/database/jdbc_auto_register.go similarity index 85% rename from pkg/collector/basic/database/jdbc_auto_register.go rename to internal/collector/basic/database/jdbc_auto_register.go index 9eac413..dcaf13f 100644 --- a/pkg/collector/basic/database/jdbc_auto_register.go +++ b/internal/collector/basic/database/jdbc_auto_register.go @@ -18,9 +18,9 @@ package database import ( - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/registry" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/registry" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // init 函数会在包被导入时自动执行,完成JDBC采集器的自动注册 diff --git a/pkg/collector/basic/database/jdbc_collector.go b/internal/collector/basic/database/jdbc_collector.go similarity index 98% rename from pkg/collector/basic/database/jdbc_collector.go rename to internal/collector/basic/database/jdbc_collector.go index 924ca18..361ccc9 100644 --- a/pkg/collector/basic/database/jdbc_collector.go +++ b/internal/collector/basic/database/jdbc_collector.go @@ -28,10 +28,10 @@ import ( _ "github.com/go-sql-driver/mysql" _ "github.com/lib/pq" _ "github.com/microsoft/go-mssqldb" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic" ) const ( diff --git a/pkg/collector/basic/database/jdbc_collector_test.go b/internal/collector/basic/database/jdbc_collector_test.go similarity index 95% rename from pkg/collector/basic/database/jdbc_collector_test.go rename to internal/collector/basic/database/jdbc_collector_test.go index 58048cf..ecc14b2 100644 --- a/pkg/collector/basic/database/jdbc_collector_test.go +++ b/internal/collector/basic/database/jdbc_collector_test.go @@ -21,9 +21,9 @@ package database // "os" // "testing" // -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" -// jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" +// "hertzbeat.apache.org/hertzbeat-collector-go/internal/logger" +// "hertzbeat.apache.org/hertzbeat-collector-go/internal/types" +// jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" //) // //func TestJDBCCollector_SupportProtocol(t *testing.T) { diff --git a/pkg/collector/basic/.keep b/internal/collector/basic/dns/.keep similarity index 100% rename from pkg/collector/basic/.keep rename to internal/collector/basic/dns/.keep diff --git a/pkg/collector/basic/database/.keep b/internal/collector/basic/ftp/.keep similarity index 100% rename from pkg/collector/basic/database/.keep rename to internal/collector/basic/ftp/.keep diff --git a/pkg/collector/basic/dns/.keep b/internal/collector/basic/http/.keep similarity index 100% rename from pkg/collector/basic/dns/.keep rename to internal/collector/basic/http/.keep diff --git a/pkg/collector/basic/ftp/.keep b/internal/collector/basic/icmp/.keep similarity index 100% rename from pkg/collector/basic/ftp/.keep rename to internal/collector/basic/icmp/.keep diff --git a/pkg/collector/basic/http/.keep b/internal/collector/basic/imap/.keep similarity index 100% rename from pkg/collector/basic/http/.keep rename to internal/collector/basic/imap/.keep diff --git a/pkg/collector/basic/icmp/.keep b/internal/collector/basic/ipmi2/.keep similarity index 100% rename from pkg/collector/basic/icmp/.keep rename to internal/collector/basic/ipmi2/.keep diff --git a/pkg/collector/basic/imap/.keep b/internal/collector/basic/jmx/.keep similarity index 100% rename from pkg/collector/basic/imap/.keep rename to internal/collector/basic/jmx/.keep diff --git a/pkg/collector/basic/ipmi2/.keep b/internal/collector/basic/memcached/.keep similarity index 100% rename from pkg/collector/basic/ipmi2/.keep rename to internal/collector/basic/memcached/.keep diff --git a/pkg/collector/basic/jmx/.keep b/internal/collector/basic/modbus/.keep similarity index 100% rename from pkg/collector/basic/jmx/.keep rename to internal/collector/basic/modbus/.keep diff --git a/pkg/collector/basic/memcached/.keep b/internal/collector/basic/mqtt/.keep similarity index 100% rename from pkg/collector/basic/memcached/.keep rename to internal/collector/basic/mqtt/.keep diff --git a/pkg/collector/basic/modbus/.keep b/internal/collector/basic/nginx/.keep similarity index 100% rename from pkg/collector/basic/modbus/.keep rename to internal/collector/basic/nginx/.keep diff --git a/pkg/collector/basic/mqtt/.keep b/internal/collector/basic/ntp/.keep similarity index 100% rename from pkg/collector/basic/mqtt/.keep rename to internal/collector/basic/ntp/.keep diff --git a/pkg/collector/basic/nginx/.keep b/internal/collector/basic/plc/.keep similarity index 100% rename from pkg/collector/basic/nginx/.keep rename to internal/collector/basic/plc/.keep diff --git a/pkg/collector/basic/ntp/.keep b/internal/collector/basic/pop3/.keep similarity index 100% rename from pkg/collector/basic/ntp/.keep rename to internal/collector/basic/pop3/.keep diff --git a/pkg/collector/basic/plc/.keep b/internal/collector/basic/prometheus/.keep similarity index 100% rename from pkg/collector/basic/plc/.keep rename to internal/collector/basic/prometheus/.keep diff --git a/pkg/collector/basic/pop3/.keep b/internal/collector/basic/push/.keep similarity index 100% rename from pkg/collector/basic/pop3/.keep rename to internal/collector/basic/push/.keep diff --git a/pkg/collector/basic/prometheus/.keep b/internal/collector/basic/redfish/.keep similarity index 100% rename from pkg/collector/basic/prometheus/.keep rename to internal/collector/basic/redfish/.keep diff --git a/pkg/collector/basic/push/.keep b/internal/collector/basic/redis/.keep similarity index 100% rename from pkg/collector/basic/push/.keep rename to internal/collector/basic/redis/.keep diff --git a/pkg/collector/basic/redfish/.keep b/internal/collector/basic/registry/.keep similarity index 100% rename from pkg/collector/basic/redfish/.keep rename to internal/collector/basic/registry/.keep diff --git a/pkg/collector/basic/redis/.keep b/internal/collector/basic/s7/.keep similarity index 100% rename from pkg/collector/basic/redis/.keep rename to internal/collector/basic/s7/.keep diff --git a/pkg/collector/basic/registry/.keep b/internal/collector/basic/script/.keep similarity index 100% rename from pkg/collector/basic/registry/.keep rename to internal/collector/basic/script/.keep diff --git a/pkg/collector/basic/s7/.keep b/internal/collector/basic/sd/.keep similarity index 100% rename from pkg/collector/basic/s7/.keep rename to internal/collector/basic/sd/.keep diff --git a/pkg/collector/basic/script/.keep b/internal/collector/basic/smtp/.keep similarity index 100% rename from pkg/collector/basic/script/.keep rename to internal/collector/basic/smtp/.keep diff --git a/pkg/collector/basic/sd/.keep b/internal/collector/basic/snmp/.keep similarity index 100% rename from pkg/collector/basic/sd/.keep rename to internal/collector/basic/snmp/.keep diff --git a/pkg/collector/basic/smtp/.keep b/internal/collector/basic/ssh/.keep similarity index 100% rename from pkg/collector/basic/smtp/.keep rename to internal/collector/basic/ssh/.keep diff --git a/pkg/collector/basic/snmp/.keep b/internal/collector/basic/telnet/.keep similarity index 100% rename from pkg/collector/basic/snmp/.keep rename to internal/collector/basic/telnet/.keep diff --git a/pkg/collector/basic/ssh/.keep b/internal/collector/basic/udp/.keep similarity index 100% rename from pkg/collector/basic/ssh/.keep rename to internal/collector/basic/udp/.keep diff --git a/pkg/collector/basic/telnet/.keep b/internal/collector/basic/websocket/.keep similarity index 100% rename from pkg/collector/basic/telnet/.keep rename to internal/collector/basic/websocket/.keep diff --git a/pkg/collector/collect_service.go b/internal/collector/collect_service.go similarity index 90% rename from pkg/collector/collect_service.go rename to internal/collector/collect_service.go index 7714e56..b32562b 100644 --- a/pkg/collector/collect_service.go +++ b/internal/collector/collect_service.go @@ -21,10 +21,10 @@ import ( "fmt" "time" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/timer" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/timer" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // CollectService manages all collectors and provides a unified interface @@ -84,10 +84,10 @@ func (cs *CollectService) RegisterCollector(protocol string, collector basic.Abs } // Collect performs metrics collection using the appropriate collector -func (cs *CollectService) Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRepMetricsData { +func (cs *CollectService) Collect(metrics *job.Metrics) *job.CollectRepMetricsData { if metrics == nil { cs.logger.Error(fmt.Errorf("metrics is nil"), "failed to collect metrics") - return &jobtypes.CollectRepMetricsData{ + return &job.CollectRepMetricsData{ Code: 500, Msg: "metrics configuration is nil", } @@ -96,7 +96,7 @@ func (cs *CollectService) Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRe protocol := metrics.Protocol if protocol == "" { cs.logger.Error(fmt.Errorf("protocol not specified"), "failed to collect metrics") - return &jobtypes.CollectRepMetricsData{ + return &job.CollectRepMetricsData{ Code: 500, Msg: "protocol not specified", } @@ -105,7 +105,7 @@ func (cs *CollectService) Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRe collector, exists := cs.registry.GetCollector(protocol) if !exists { cs.logger.Error(fmt.Errorf("no collector found for protocol: %s", protocol), "failed to collect metrics") - return &jobtypes.CollectRepMetricsData{ + return &job.CollectRepMetricsData{ Code: 500, Msg: fmt.Sprintf("unsupported protocol: %s", protocol), } @@ -120,7 +120,7 @@ func (cs *CollectService) Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRe // Pre-check the metrics configuration if err := collector.PreCheck(metrics); err != nil { cs.logger.Error(err, "metrics pre-check failed", "protocol", protocol) - return &jobtypes.CollectRepMetricsData{ + return &job.CollectRepMetricsData{ Code: 400, Msg: fmt.Sprintf("pre-check failed: %v", err), } @@ -149,7 +149,7 @@ func (cs *CollectService) GetCollector(protocol string) (basic.AbstractCollector } // DispatchMetricsTask implements MetricsTaskDispatcher interface -func (cs *CollectService) DispatchMetricsTask(timeout *jobtypes.Timeout) error { +func (cs *CollectService) DispatchMetricsTask(timeout *job.Timeout) error { if timeout == nil { cs.logger.Error(fmt.Errorf("timeout is nil"), "failed to dispatch metrics task") return fmt.Errorf("timeout cannot be nil") @@ -158,7 +158,7 @@ func (cs *CollectService) DispatchMetricsTask(timeout *jobtypes.Timeout) error { cs.logger.Info("dispatching metrics task") // Extract job information from timeout task - var job *jobtypes.Job + var job *job.Job if wheelTask, ok := timeout.Task().(*timer.WheelTimerTask); ok { job = wheelTask.GetJob() } else { @@ -212,7 +212,7 @@ func (cs *CollectService) DispatchMetricsTask(timeout *jobtypes.Timeout) error { } // createAndSubmitTask creates a metrics collection task and submits it to the worker pool -func (cs *CollectService) createAndSubmitTask(metrics *jobtypes.Metrics, timeout *jobtypes.Timeout, job *jobtypes.Job) error { +func (cs *CollectService) createAndSubmitTask(metrics *job.Metrics, timeout *job.Timeout, job *job.Job) error { // Validate metrics if metrics == nil { return fmt.Errorf("metrics is nil") @@ -262,7 +262,7 @@ func (cs *CollectService) createAndSubmitTask(metrics *jobtypes.Metrics, timeout } // calculateTaskPriority calculates the priority for a metrics collection task -func (cs *CollectService) calculateTaskPriority(metrics *jobtypes.Metrics, job *jobtypes.Job) int { +func (cs *CollectService) calculateTaskPriority(metrics *job.Metrics, job *job.Job) int { priority := cs.defaultPriority // Adjust priority based on metrics configuration @@ -291,9 +291,9 @@ func (cs *CollectService) calculateTaskPriority(metrics *jobtypes.Metrics, job * } // collectWithPriority performs metrics collection with priority and timeout handling -func (cs *CollectService) collectWithPriority(metrics *jobtypes.Metrics, timeout *jobtypes.Timeout, priority int) *jobtypes.CollectRepMetricsData { +func (cs *CollectService) collectWithPriority(metrics *job.Metrics, timeout *job.Timeout, priority int) *job.CollectRepMetricsData { // Create channel for result - resultChan := make(chan *jobtypes.CollectRepMetricsData, 1) + resultChan := make(chan *job.CollectRepMetricsData, 1) // Calculate timeout duration based on priority (higher priority gets more time) baseTimeout := 60 * time.Second @@ -323,7 +323,7 @@ func (cs *CollectService) collectWithPriority(metrics *jobtypes.Metrics, timeout if r := recover(); r != nil { cs.logger.Error(fmt.Errorf("panic in collection: %v", r), "metrics collection panicked", "metrics", metrics.Name, "protocol", metrics.Protocol, "priority", priority) - resultChan <- &jobtypes.CollectRepMetricsData{ + resultChan <- &job.CollectRepMetricsData{ Code: 500, Msg: fmt.Sprintf("collection panicked: %v", r), } @@ -350,7 +350,7 @@ func (cs *CollectService) collectWithPriority(metrics *jobtypes.Metrics, timeout "priority", priority, "timeout", timeoutDuration) - return &jobtypes.CollectRepMetricsData{ + return &job.CollectRepMetricsData{ Code: 504, // Gateway Timeout Msg: fmt.Sprintf("collection timed out after %v (priority: %d)", timeoutDuration, priority), } diff --git a/pkg/collector/basic/udp/.keep b/internal/collector/common/cache/.keep similarity index 100% rename from pkg/collector/basic/udp/.keep rename to internal/collector/common/cache/.keep diff --git a/pkg/collector/basic/websocket/.keep b/internal/collector/common/collect/dispatch/.keep similarity index 100% rename from pkg/collector/basic/websocket/.keep rename to internal/collector/common/collect/dispatch/.keep diff --git a/pkg/collector/common/collect/metrics/hertzbeat_metrics_collector.go b/internal/collector/common/collect/metrics/hertzbeat_metrics_collector.go similarity index 100% rename from pkg/collector/common/collect/metrics/hertzbeat_metrics_collector.go rename to internal/collector/common/collect/metrics/hertzbeat_metrics_collector.go diff --git a/pkg/collector/common/collect/strategy/strategy_factory.go b/internal/collector/common/collect/strategy/strategy_factory.go similarity index 100% rename from pkg/collector/common/collect/strategy/strategy_factory.go rename to internal/collector/common/collect/strategy/strategy_factory.go diff --git a/pkg/collector/common/dispatcher/entrance/client.go b/internal/collector/common/dispatcher/entrance/client.go similarity index 99% rename from pkg/collector/common/dispatcher/entrance/client.go rename to internal/collector/common/dispatcher/entrance/client.go index 9cb6e49..358e715 100644 --- a/pkg/collector/common/dispatcher/entrance/client.go +++ b/internal/collector/common/dispatcher/entrance/client.go @@ -30,7 +30,7 @@ import ( "google.golang.org/protobuf/proto" "hertzbeat.apache.org/hertzbeat-collector-go/api" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // MessageProcessor defines the interface for processing received messages. diff --git a/pkg/collector/common/dispatcher/entrance/collect_job_service.go b/internal/collector/common/dispatcher/entrance/collect_job_service.go similarity index 84% rename from pkg/collector/common/dispatcher/entrance/collect_job_service.go rename to internal/collector/common/dispatcher/entrance/collect_job_service.go index 3a6482e..2e35969 100644 --- a/pkg/collector/common/dispatcher/entrance/collect_job_service.go +++ b/internal/collector/common/dispatcher/entrance/collect_job_service.go @@ -25,10 +25,10 @@ import ( "time" "hertzbeat.apache.org/hertzbeat-collector-go/api" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/timer" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/worker" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/timer" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/worker" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // CollectJobService manages collection jobs and provides API interface. @@ -41,7 +41,7 @@ type CollectJobService struct { logger logger.Logger // For synchronous job data collection - responseListeners map[int64]chan []jobtypes.CollectRepMetricsData + responseListeners map[int64]chan []job.CollectRepMetricsData listenersMutex sync.RWMutex } @@ -64,7 +64,7 @@ func NewCollectJobService( collectorIdentity: identity, collectorMode: collectorConfig.Mode, logger: logger.WithName("collect-job-service"), - responseListeners: make(map[int64]chan []jobtypes.CollectRepMetricsData), + responseListeners: make(map[int64]chan []job.CollectRepMetricsData), } } @@ -74,8 +74,8 @@ func (cjs *CollectJobService) SetNetworkClient(client *NetworkClient) { } // CollectSyncJobData executes a one-time collection task and returns the collected data. -func (cjs *CollectJobService) CollectSyncJobData(job *jobtypes.Job) ([]jobtypes.CollectRepMetricsData, error) { - responseChan := make(chan []jobtypes.CollectRepMetricsData, 1) +func (cjs *CollectJobService) CollectSyncJobData(job *job.Job) ([]job.CollectRepMetricsData, error) { + responseChan := make(chan []job.CollectRepMetricsData, 1) // Register response listener cjs.listenersMutex.Lock() @@ -101,12 +101,12 @@ func (cjs *CollectJobService) CollectSyncJobData(job *jobtypes.Job) ([]jobtypes. case <-time.After(120 * time.Second): cjs.logger.Info("sync task runs for 120 seconds with no response, returning empty result", "jobId", job.MonitorID) - return []jobtypes.CollectRepMetricsData{}, nil + return []job.CollectRepMetricsData{}, nil } } // CollectSyncOneTimeJobData executes a one-time collection task and sends the response. -func (cjs *CollectJobService) CollectSyncOneTimeJobData(job *jobtypes.Job) error { +func (cjs *CollectJobService) CollectSyncOneTimeJobData(job *job.Job) error { // Execute the job in a goroutine to avoid blocking return cjs.workerPool.Submit(&syncJobTask{ job: job, @@ -116,7 +116,7 @@ func (cjs *CollectJobService) CollectSyncOneTimeJobData(job *jobtypes.Job) error } // AddAsyncCollectJob adds a periodic asynchronous collection task. -func (cjs *CollectJobService) AddAsyncCollectJob(job *jobtypes.Job) error { +func (cjs *CollectJobService) AddAsyncCollectJob(job *job.Job) error { // Clone the job and mark as cyclic jobCopy := *job jobCopy.IsCyclic = true @@ -134,11 +134,11 @@ func (cjs *CollectJobService) CancelAsyncCollectJob(jobID int64) error { } // DispatchCollectData implements CollectDataDispatcher interface. -func (cjs *CollectJobService) DispatchCollectData(timeout *jobtypes.Timeout, metrics *jobtypes.Metrics, metricsData []*worker.MetricsData) error { +func (cjs *CollectJobService) DispatchCollectData(timeout *job.Timeout, metrics *job.Metrics, metricsData []*worker.MetricsData) error { // Convert worker.MetricsData to jobtypes.CollectRepMetricsData - convertedData := make([]jobtypes.CollectRepMetricsData, 0, len(metricsData)) + convertedData := make([]job.CollectRepMetricsData, 0, len(metricsData)) for _, data := range metricsData { - convertedData = append(convertedData, jobtypes.CollectRepMetricsData{ + convertedData = append(convertedData, job.CollectRepMetricsData{ ID: data.ID, MonitorID: data.ID, // Use ID as MonitorID TenantID: data.TenantID, @@ -179,13 +179,13 @@ func (cjs *CollectJobService) DispatchCollectData(timeout *jobtypes.Timeout, met } // SendAsyncCollectData sends asynchronous collect response data to the manager. -func (cjs *CollectJobService) SendAsyncCollectData(metricsData jobtypes.CollectRepMetricsData) error { +func (cjs *CollectJobService) SendAsyncCollectData(metricsData job.CollectRepMetricsData) error { if cjs.networkClient == nil { return fmt.Errorf("network client is not set") } // Serialize metrics data - data, err := cjs.serializeMetricsData([]jobtypes.CollectRepMetricsData{metricsData}) + data, err := cjs.serializeMetricsData([]job.CollectRepMetricsData{metricsData}) if err != nil { return fmt.Errorf("failed to serialize metrics data: %w", err) } @@ -201,13 +201,13 @@ func (cjs *CollectJobService) SendAsyncCollectData(metricsData jobtypes.CollectR } // SendAsyncServiceDiscoveryData sends asynchronous service discovery data to the manager. -func (cjs *CollectJobService) SendAsyncServiceDiscoveryData(metricsData jobtypes.CollectRepMetricsData) error { +func (cjs *CollectJobService) SendAsyncServiceDiscoveryData(metricsData job.CollectRepMetricsData) error { if cjs.networkClient == nil { return fmt.Errorf("network client is not set") } // Serialize metrics data - data, err := cjs.serializeMetricsData([]jobtypes.CollectRepMetricsData{metricsData}) + data, err := cjs.serializeMetricsData([]job.CollectRepMetricsData{metricsData}) if err != nil { return fmt.Errorf("failed to serialize metrics data: %w", err) } @@ -223,7 +223,7 @@ func (cjs *CollectJobService) SendAsyncServiceDiscoveryData(metricsData jobtypes } // ResponseSyncJobData handles responses from synchronous jobs. -func (cjs *CollectJobService) ResponseSyncJobData(jobID int64, metricsData []jobtypes.CollectRepMetricsData) { +func (cjs *CollectJobService) ResponseSyncJobData(jobID int64, metricsData []job.CollectRepMetricsData) { cjs.listenersMutex.RLock() responseChan, exists := cjs.responseListeners[jobID] cjs.listenersMutex.RUnlock() @@ -249,7 +249,7 @@ func (cjs *CollectJobService) GetCollectorMode() string { } // serializeMetricsData serializes metrics data for transmission. -func (cjs *CollectJobService) serializeMetricsData(metricsData []jobtypes.CollectRepMetricsData) ([]byte, error) { +func (cjs *CollectJobService) serializeMetricsData(metricsData []job.CollectRepMetricsData) ([]byte, error) { // TODO: Implement proper serialization using Apache Arrow format // For now, use JSON serialization as a placeholder return json.Marshal(metricsData) @@ -257,7 +257,7 @@ func (cjs *CollectJobService) serializeMetricsData(metricsData []jobtypes.Collec // syncJobTask represents a synchronous job execution task. type syncJobTask struct { - job *jobtypes.Job + job *job.Job service *CollectJobService logger logger.Logger } @@ -307,10 +307,10 @@ func (sjt *syncJobTask) Timeout() time.Duration { } // convertStringArraysToValueRows converts [][]string to []jobtypes.ValueRow -func convertStringArraysToValueRows(values [][]string) []jobtypes.ValueRow { - valueRows := make([]jobtypes.ValueRow, len(values)) +func convertStringArraysToValueRows(values [][]string) []job.ValueRow { + valueRows := make([]job.ValueRow, len(values)) for i, row := range values { - valueRows[i] = jobtypes.ValueRow{Columns: row} + valueRows[i] = job.ValueRow{Columns: row} } return valueRows } diff --git a/pkg/collector/common/dispatcher/entrance/collect_job_service_test.go b/internal/collector/common/dispatcher/entrance/collect_job_service_test.go similarity index 95% rename from pkg/collector/common/dispatcher/entrance/collect_job_service_test.go rename to internal/collector/common/dispatcher/entrance/collect_job_service_test.go index 59ed630..070e6c6 100644 --- a/pkg/collector/common/dispatcher/entrance/collect_job_service_test.go +++ b/internal/collector/common/dispatcher/entrance/collect_job_service_test.go @@ -22,10 +22,10 @@ package entrance // "testing" // "time" // -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/timer" -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/worker" -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" -// jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" +// "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/timer" +// "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/worker" +// "hertzbeat.apache.org/hertzbeat-collector-go/internal/logger" +// jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" //) // //func TestCollectJobService_BasicOperations(t *testing.T) { diff --git a/pkg/collector/common/dispatcher/entrance/collect_server.go b/internal/collector/common/dispatcher/entrance/collect_server.go similarity index 97% rename from pkg/collector/common/dispatcher/entrance/collect_server.go rename to internal/collector/common/dispatcher/entrance/collect_server.go index 263040e..1d18519 100644 --- a/pkg/collector/common/dispatcher/entrance/collect_server.go +++ b/internal/collector/common/dispatcher/entrance/collect_server.go @@ -22,11 +22,11 @@ import ( "sync" "time" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/timer" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/worker" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/timer" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/worker" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // CollectServer manages the complete collection infrastructure including diff --git a/pkg/collector/common/dispatcher/entrance/config.go b/internal/collector/common/dispatcher/entrance/config.go similarity index 100% rename from pkg/collector/common/dispatcher/entrance/config.go rename to internal/collector/common/dispatcher/entrance/config.go diff --git a/pkg/collector/common/dispatcher/entrance/mysql_scheduling_test.go b/internal/collector/common/dispatcher/entrance/mysql_scheduling_test.go similarity index 91% rename from pkg/collector/common/dispatcher/entrance/mysql_scheduling_test.go rename to internal/collector/common/dispatcher/entrance/mysql_scheduling_test.go index 1b01c1a..7643c21 100644 --- a/pkg/collector/common/dispatcher/entrance/mysql_scheduling_test.go +++ b/internal/collector/common/dispatcher/entrance/mysql_scheduling_test.go @@ -25,18 +25,18 @@ import ( "time" "hertzbeat.apache.org/hertzbeat-collector-go/api" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/timer" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/worker" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/timer" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + logger2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/worker" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // TestMySQLJobScheduling tests the complete MySQL job scheduling flow // from CollectJobService to JDBC collection without requiring Manager connection func TestMySQLJobScheduling(t *testing.T) { - log := logger.DefaultLogger(os.Stdout, types.LogLevelInfo) + log := logger.DefaultLogger(os.Stdout, logger2.LogLevelInfo) t.Log("=== MySQL Job Scheduling Test (Mock Manager) ===") @@ -201,8 +201,8 @@ func testMockJobCancellation(t *testing.T, service *CollectJobService, log logge // Helper functions for creating test jobs // createMySQLCyclicJob creates a cyclic MySQL status monitoring job -func createMySQLCyclicJob() *jobtypes.Job { - return &jobtypes.Job{ +func createMySQLCyclicJob() *job.Job { + return &job.Job{ ID: time.Now().UnixNano(), TenantID: 1, MonitorID: 2001, @@ -214,13 +214,13 @@ func createMySQLCyclicJob() *jobtypes.Job { Metadata: map[string]string{ "description": "Cyclic MySQL status monitoring", }, - Metrics: []jobtypes.Metrics{ + Metrics: []job.Metrics{ { Name: "mysql-status", Protocol: "jdbc", Host: "localhost", Port: "53306", - JDBC: &jobtypes.JDBCProtocol{ + JDBC: &job.JDBCProtocol{ Host: "localhost", Port: "53306", Platform: "mysql", @@ -240,8 +240,8 @@ func createMySQLCyclicJob() *jobtypes.Job { } // createMySQLOneTimeJob creates a one-time MySQL variables check job -func createMySQLOneTimeJob() *jobtypes.Job { - return &jobtypes.Job{ +func createMySQLOneTimeJob() *job.Job { + return &job.Job{ ID: time.Now().UnixNano() + 1, TenantID: 1, MonitorID: 2002, @@ -253,13 +253,13 @@ func createMySQLOneTimeJob() *jobtypes.Job { Metadata: map[string]string{ "description": "One-time MySQL variables check", }, - Metrics: []jobtypes.Metrics{ + Metrics: []job.Metrics{ { Name: "mysql-variables", Protocol: "jdbc", Host: "localhost", Port: "53306", - JDBC: &jobtypes.JDBCProtocol{ + JDBC: &job.JDBCProtocol{ Host: "localhost", Port: "53306", Platform: "mysql", @@ -279,8 +279,8 @@ func createMySQLOneTimeJob() *jobtypes.Job { } // createMySQLLongRunningJob creates a job for cancellation testing -func createMySQLLongRunningJob() *jobtypes.Job { - return &jobtypes.Job{ +func createMySQLLongRunningJob() *job.Job { + return &job.Job{ ID: time.Now().UnixNano() + 2, TenantID: 1, MonitorID: 2003, @@ -292,13 +292,13 @@ func createMySQLLongRunningJob() *jobtypes.Job { Metadata: map[string]string{ "description": "Long running MySQL monitoring for cancellation test", }, - Metrics: []jobtypes.Metrics{ + Metrics: []job.Metrics{ { Name: "mysql-processes", Protocol: "jdbc", Host: "localhost", Port: "53306", - JDBC: &jobtypes.JDBCProtocol{ + JDBC: &job.JDBCProtocol{ Host: "localhost", Port: "53306", Platform: "mysql", @@ -320,7 +320,7 @@ func createMySQLLongRunningJob() *jobtypes.Job { // Helper functions for message serialization // serializeJob serializes a job to JSON for message transmission -func serializeJob(t *testing.T, job *jobtypes.Job) []byte { +func serializeJob(t *testing.T, job *job.Job) []byte { data, err := json.Marshal(job) if err != nil { t.Fatalf("Failed to serialize job: %v", err) diff --git a/pkg/collector/common/dispatcher/entrance/processors.go b/internal/collector/common/dispatcher/entrance/processors.go similarity index 98% rename from pkg/collector/common/dispatcher/entrance/processors.go rename to internal/collector/common/dispatcher/entrance/processors.go index 10d534f..c0fc33c 100644 --- a/pkg/collector/common/dispatcher/entrance/processors.go +++ b/internal/collector/common/dispatcher/entrance/processors.go @@ -23,8 +23,8 @@ import ( "fmt" "hertzbeat.apache.org/hertzbeat-collector-go/api" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // CollectJobServiceInterface defines the interface that CollectJobService should implement. diff --git a/pkg/collector/common/cache/.keep b/internal/collector/common/dispatcher/exporter/.keep similarity index 100% rename from pkg/collector/common/cache/.keep rename to internal/collector/common/dispatcher/exporter/.keep diff --git a/pkg/collector/common/job/job_server.go b/internal/collector/common/job/job_server.go similarity index 94% rename from pkg/collector/common/job/job_server.go rename to internal/collector/common/job/job_server.go index 1fe3669..3bcefa4 100644 --- a/pkg/collector/common/job/job_server.go +++ b/internal/collector/common/job/job_server.go @@ -20,7 +20,7 @@ package job import ( "context" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) type Server struct { diff --git a/pkg/collector/common/collect/dispatch/.keep b/internal/collector/common/ssh/.keep similarity index 100% rename from pkg/collector/common/collect/dispatch/.keep rename to internal/collector/common/ssh/.keep diff --git a/pkg/collector/common/timer/timer_dispatcher.go b/internal/collector/common/timer/timer_dispatcher.go similarity index 94% rename from pkg/collector/common/timer/timer_dispatcher.go rename to internal/collector/common/timer/timer_dispatcher.go index 215a406..d0cded6 100644 --- a/pkg/collector/common/timer/timer_dispatcher.go +++ b/internal/collector/common/timer/timer_dispatcher.go @@ -23,9 +23,9 @@ import ( "sync/atomic" "time" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/worker" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/worker" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // CollectResponseEventListener defines the interface for handling collect response events @@ -62,8 +62,8 @@ type TimerDispatcher struct { // MetricsTimeoutInfo tracks timeout information for metrics collection tasks type MetricsTimeoutInfo struct { StartTime time.Time - Metrics *jobtypes.Metrics - Timeout *jobtypes.Timeout + Metrics *job.Metrics + Timeout *job.Timeout MaxDuration time.Duration RetryCount int MaxRetries int @@ -149,7 +149,7 @@ func (td *TimerDispatcher) Stop() error { } // AddJob adds a job to the scheduler -func (td *TimerDispatcher) AddJob(job *jobtypes.Job, eventListener CollectResponseEventListener) error { +func (td *TimerDispatcher) AddJob(job *job.Job, eventListener CollectResponseEventListener) error { if !td.started.Load() { td.logger.Info("collector is offline, cannot dispatch collect jobs") return fmt.Errorf("collector is offline") @@ -256,14 +256,14 @@ func (td *TimerDispatcher) DeleteJob(jobID int64, isCyclic bool) bool { if isCyclic { if timeoutInterface, exists := td.cyclicTasks.LoadAndDelete(jobID); exists { - if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok { + if timeout, ok := timeoutInterface.(*job.Timeout); ok { timeout.Cancel() removed = true } } } else { if timeoutInterface, exists := td.tempTasks.LoadAndDelete(jobID); exists { - if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok { + if timeout, ok := timeoutInterface.(*job.Timeout); ok { timeout.Cancel() removed = true } @@ -287,7 +287,7 @@ func (td *TimerDispatcher) GoOnline() { // Cancel all existing tasks td.cyclicTasks.Range(func(key, value interface{}) bool { - if timeout, ok := value.(*jobtypes.Timeout); ok { + if timeout, ok := value.(*job.Timeout); ok { timeout.Cancel() } td.cyclicTasks.Delete(key) @@ -295,7 +295,7 @@ func (td *TimerDispatcher) GoOnline() { }) td.tempTasks.Range(func(key, value interface{}) bool { - if timeout, ok := value.(*jobtypes.Timeout); ok { + if timeout, ok := value.(*job.Timeout); ok { timeout.Cancel() } td.tempTasks.Delete(key) @@ -319,7 +319,7 @@ func (td *TimerDispatcher) GoOffline() { // Cancel all existing tasks td.cyclicTasks.Range(func(key, value interface{}) bool { - if timeout, ok := value.(*jobtypes.Timeout); ok { + if timeout, ok := value.(*job.Timeout); ok { timeout.Cancel() } td.cyclicTasks.Delete(key) @@ -327,7 +327,7 @@ func (td *TimerDispatcher) GoOffline() { }) td.tempTasks.Range(func(key, value interface{}) bool { - if timeout, ok := value.(*jobtypes.Timeout); ok { + if timeout, ok := value.(*job.Timeout); ok { timeout.Cancel() } td.tempTasks.Delete(key) @@ -356,7 +356,7 @@ func (td *TimerDispatcher) ResponseSyncJobData(jobID int64, metricsData []interf } // DispatchMetricsTask implements MetricsTaskDispatcher interface -func (td *TimerDispatcher) DispatchMetricsTask(timeout *jobtypes.Timeout) error { +func (td *TimerDispatcher) DispatchMetricsTask(timeout *job.Timeout) error { if task, ok := timeout.Task().(*WheelTimerTask); ok { job := task.GetJob() td.logger.Info("dispatching metrics task", @@ -585,7 +585,7 @@ func (td *TimerDispatcher) retryMetricsCollection(timeoutInfo *MetricsTimeoutInf } // AddMetricsTimeout adds a metrics collection task to timeout monitoring -func (td *TimerDispatcher) AddMetricsTimeout(metrics *jobtypes.Metrics, timeout *jobtypes.Timeout) { +func (td *TimerDispatcher) AddMetricsTimeout(metrics *job.Metrics, timeout *job.Timeout) { // Calculate max duration based on metrics timeout or default maxDuration := 120 * time.Second // default 2 minutes if metrics.Timeout != "" { @@ -614,7 +614,7 @@ func (td *TimerDispatcher) AddMetricsTimeout(metrics *jobtypes.Metrics, timeout } // RemoveMetricsTimeout removes a metrics collection task from timeout monitoring -func (td *TimerDispatcher) RemoveMetricsTimeout(metrics *jobtypes.Metrics, timeout *jobtypes.Timeout) { +func (td *TimerDispatcher) RemoveMetricsTimeout(metrics *job.Metrics, timeout *job.Timeout) { key := td.generateTimeoutKey(metrics, timeout) td.timeoutMonitor.Delete(key) @@ -624,7 +624,7 @@ func (td *TimerDispatcher) RemoveMetricsTimeout(metrics *jobtypes.Metrics, timeo } // generateTimeoutKey generates a unique key for timeout monitoring -func (td *TimerDispatcher) generateTimeoutKey(metrics *jobtypes.Metrics, timeout *jobtypes.Timeout) string { +func (td *TimerDispatcher) generateTimeoutKey(metrics *job.Metrics, timeout *job.Timeout) string { // Use a combination of metrics name and timestamp to ensure uniqueness return fmt.Sprintf("%s-%d-%s", metrics.Name, time.Now().UnixNano(), metrics.Protocol) } diff --git a/pkg/collector/common/timer/timer_dispatcher_test.go b/internal/collector/common/timer/timer_dispatcher_test.go similarity index 98% rename from pkg/collector/common/timer/timer_dispatcher_test.go rename to internal/collector/common/timer/timer_dispatcher_test.go index 2a21b1a..b2c17ec 100644 --- a/pkg/collector/common/timer/timer_dispatcher_test.go +++ b/internal/collector/common/timer/timer_dispatcher_test.go @@ -23,8 +23,8 @@ package timer // "testing" // "time" // -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" -// jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" +// "hertzbeat.apache.org/hertzbeat-collector-go/internal/logger" +// jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" //) // //// mockMetricsDispatcher implements MetricsTaskDispatcher for testing diff --git a/pkg/collector/common/timer/timer_wheel.go b/internal/collector/common/timer/timer_wheel.go similarity index 98% rename from pkg/collector/common/timer/timer_wheel.go rename to internal/collector/common/timer/timer_wheel.go index 02aa065..39bda99 100644 --- a/pkg/collector/common/timer/timer_wheel.go +++ b/internal/collector/common/timer/timer_wheel.go @@ -24,8 +24,8 @@ import ( "sync/atomic" "time" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) const ( diff --git a/pkg/collector/common/timer/timer_wheel_test.go b/internal/collector/common/timer/timer_wheel_test.go similarity index 97% rename from pkg/collector/common/timer/timer_wheel_test.go rename to internal/collector/common/timer/timer_wheel_test.go index fce916c..ad35feb 100644 --- a/pkg/collector/common/timer/timer_wheel_test.go +++ b/internal/collector/common/timer/timer_wheel_test.go @@ -23,8 +23,8 @@ package timer // "testing" // "time" // -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" -// jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" +// "hertzbeat.apache.org/hertzbeat-collector-go/internal/logger" +// jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" //) // //// mockTask implements TimerTask interface for testing diff --git a/pkg/collector/common/timer/wheel_timer_task.go b/internal/collector/common/timer/wheel_timer_task.go similarity index 89% rename from pkg/collector/common/timer/wheel_timer_task.go rename to internal/collector/common/timer/wheel_timer_task.go index 36fc2f6..478c8c1 100644 --- a/pkg/collector/common/timer/wheel_timer_task.go +++ b/internal/collector/common/timer/wheel_timer_task.go @@ -22,24 +22,24 @@ import ( "fmt" "time" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // MetricsTaskDispatcher defines the interface for dispatching metrics collection tasks type MetricsTaskDispatcher interface { - DispatchMetricsTask(timeout *jobtypes.Timeout) error + DispatchMetricsTask(timeout *job.Timeout) error } // WheelTimerTask represents a task that wraps a job for execution in the timer wheel type WheelTimerTask struct { - job *jobtypes.Job + job *job.Job dispatcher MetricsTaskDispatcher logger logger.Logger } // NewWheelTimerTask creates a new wheel timer task -func NewWheelTimerTask(job *jobtypes.Job, dispatcher MetricsTaskDispatcher, logger logger.Logger) *WheelTimerTask { +func NewWheelTimerTask(job *job.Job, dispatcher MetricsTaskDispatcher, logger logger.Logger) *WheelTimerTask { task := &WheelTimerTask{ job: job.Clone(), // Clone to avoid data races dispatcher: dispatcher, @@ -53,7 +53,7 @@ func NewWheelTimerTask(job *jobtypes.Job, dispatcher MetricsTaskDispatcher, logg } // Run executes the timer task -func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) error { +func (wtt *WheelTimerTask) Run(timeout *job.Timeout) error { if wtt.job == nil { return fmt.Errorf("job is nil") } @@ -76,7 +76,7 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) error { } // GetJob returns the job associated with this task -func (wtt *WheelTimerTask) GetJob() *jobtypes.Job { +func (wtt *WheelTimerTask) GetJob() *job.Job { return wtt.job } @@ -87,7 +87,7 @@ func (wtt *WheelTimerTask) initJobMetrics() { } // Build configmap for parameter substitution - configMap := make(map[string]*jobtypes.Configmap) + configMap := make(map[string]*job.Configmap) for i := range wtt.job.Configmap { configMap[wtt.job.Configmap[i].Key] = &wtt.job.Configmap[i] @@ -106,7 +106,7 @@ func (wtt *WheelTimerTask) initJobMetrics() { } // Process metrics with parameter substitution - processedMetrics := make([]jobtypes.Metrics, 0, len(wtt.job.Metrics)) + processedMetrics := make([]job.Metrics, 0, len(wtt.job.Metrics)) for _, metric := range wtt.job.Metrics { // Serialize to JSON for placeholder replacement @@ -120,7 +120,7 @@ func (wtt *WheelTimerTask) initJobMetrics() { processedJSON := wtt.replacePlaceholders(string(metricJSON), configMap) // Deserialize back to metric - var processedMetric jobtypes.Metrics + var processedMetric job.Metrics if err := json.Unmarshal([]byte(processedJSON), &processedMetric); err != nil { wtt.logger.Error(err, "failed to unmarshal processed metric", "metric", metric.Name) continue @@ -139,7 +139,7 @@ func (wtt *WheelTimerTask) initJobMetrics() { } // replacePlaceholders replaces ${param} placeholders in the JSON string -func (wtt *WheelTimerTask) replacePlaceholders(jsonStr string, configMap map[string]*jobtypes.Configmap) string { +func (wtt *WheelTimerTask) replacePlaceholders(jsonStr string, configMap map[string]*job.Configmap) string { // This is a simplified implementation // In the real implementation, you would use a proper template engine // or regex-based replacement similar to the Java version @@ -158,7 +158,7 @@ func (wtt *WheelTimerTask) replacePlaceholders(jsonStr string, configMap map[str } // replaceFieldsForPushStyleMonitor handles special field replacement for push-style monitors -func (wtt *WheelTimerTask) replaceFieldsForPushStyleMonitor(metric *jobtypes.Metrics, configMap map[string]*jobtypes.Configmap) { +func (wtt *WheelTimerTask) replaceFieldsForPushStyleMonitor(metric *job.Metrics, configMap map[string]*job.Configmap) { // Implementation for push-style monitor field replacement // This would involve updating metric fields based on configMap values wtt.logger.Info("processing push-style monitor fields", "metric", metric.Name) diff --git a/pkg/collector/common/transport/transport.go b/internal/collector/common/transport/transport.go similarity index 95% rename from pkg/collector/common/transport/transport.go rename to internal/collector/common/transport/transport.go index 3a8778e..3a5b7bd 100644 --- a/pkg/collector/common/transport/transport.go +++ b/internal/collector/common/transport/transport.go @@ -20,7 +20,7 @@ package transport import ( "context" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // RemotingService interface: copy java netty diff --git a/pkg/types/config_types.go b/internal/collector/common/types/config/config_types.go similarity index 98% rename from pkg/types/config_types.go rename to internal/collector/common/types/config/config_types.go index 4c7b2b9..9eef9da 100644 --- a/pkg/types/config_types.go +++ b/internal/collector/common/types/config/config_types.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package types +package config type CollectorConfig struct { Collector CollectorSection `yaml:"collector"` diff --git a/internal/collector/common/types/err/error_types.go b/internal/collector/common/types/err/error_types.go new file mode 100644 index 0000000..4718abe --- /dev/null +++ b/internal/collector/common/types/err/error_types.go @@ -0,0 +1,9 @@ +package err + +import "errors" + +// Collector Server Error Types +var ( + CollectorIPIsNull = errors.New("collector ip is empty") + CollectorPortIsNull = errors.New("collector port is empty") +) diff --git a/pkg/types/job/job_types.go b/internal/collector/common/types/job/job_types.go similarity index 100% rename from pkg/types/job/job_types.go rename to internal/collector/common/types/job/job_types.go diff --git a/pkg/types/job/metrics_types.go b/internal/collector/common/types/job/metrics_types.go similarity index 100% rename from pkg/types/job/metrics_types.go rename to internal/collector/common/types/job/metrics_types.go diff --git a/pkg/types/job/protocol/common_request_protocol.go b/internal/collector/common/types/job/protocol/common_request_protocol.go similarity index 100% rename from pkg/types/job/protocol/common_request_protocol.go rename to internal/collector/common/types/job/protocol/common_request_protocol.go diff --git a/pkg/types/job/protocol/consul_sd_protocol.go b/internal/collector/common/types/job/protocol/consul_sd_protocol.go similarity index 94% rename from pkg/types/job/protocol/consul_sd_protocol.go rename to internal/collector/common/types/job/protocol/consul_sd_protocol.go index 6090c9f..6dafb48 100644 --- a/pkg/types/job/protocol/consul_sd_protocol.go +++ b/internal/collector/common/types/job/protocol/consul_sd_protocol.go @@ -17,7 +17,9 @@ package protocol -import "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" +import ( + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) type ConsulSdProtocol struct { Host string diff --git a/pkg/types/job/protocol/ssh_protocol.go b/internal/collector/common/types/job/protocol/ssh_protocol.go similarity index 100% rename from pkg/types/job/protocol/ssh_protocol.go rename to internal/collector/common/types/job/protocol/ssh_protocol.go diff --git a/pkg/types/job/protocol/zookeeper_sd_protocol.go b/internal/collector/common/types/job/protocol/zookeeper_sd_protocol.go similarity index 95% rename from pkg/types/job/protocol/zookeeper_sd_protocol.go rename to internal/collector/common/types/job/protocol/zookeeper_sd_protocol.go index ab5db4d..44dccbd 100644 --- a/pkg/types/job/protocol/zookeeper_sd_protocol.go +++ b/internal/collector/common/types/job/protocol/zookeeper_sd_protocol.go @@ -20,7 +20,7 @@ package protocol import ( "errors" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) var ( diff --git a/pkg/types/job/timeout_types.go b/internal/collector/common/types/job/timeout_types.go similarity index 100% rename from pkg/types/job/timeout_types.go rename to internal/collector/common/types/job/timeout_types.go diff --git a/pkg/types/logging_types.go b/internal/collector/common/types/logger/logging_types.go similarity index 86% rename from pkg/types/logging_types.go rename to internal/collector/common/types/logger/logging_types.go index 1ee1da9..3838d8d 100644 --- a/pkg/types/logging_types.go +++ b/internal/collector/common/types/logger/logging_types.go @@ -15,26 +15,26 @@ // specific language governing permissions and limitations // under the License. -package types +package logger -// hertzbeat logging related types +// hertzbeat logger related types type LogLevel string const ( - // LogLevelTrace defines the "Trace" logging level. + // LogLevelTrace defines the "Trace" logger level. LogLevelTrace LogLevel = "trace" - // LogLevelDebug defines the "debug" logging level. + // LogLevelDebug defines the "debug" logger level. LogLevelDebug LogLevel = "debug" - // LogLevelInfo defines the "Info" logging level. + // LogLevelInfo defines the "Info" logger level. LogLevelInfo LogLevel = "info" - // LogLevelWarn defines the "Warn" logging level. + // LogLevelWarn defines the "Warn" logger level. LogLevelWarn LogLevel = "warn" - // LogLevelError defines the "Error" logging level. + // LogLevelError defines the "Error" logger level. LogLevelError LogLevel = "error" ) diff --git a/pkg/collector/config/config.go b/internal/collector/config/config.go similarity index 88% rename from pkg/collector/config/config.go rename to internal/collector/config/config.go index 540ad78..0acce7f 100644 --- a/pkg/collector/config/config.go +++ b/internal/collector/config/config.go @@ -23,10 +23,9 @@ import ( "os" "gopkg.in/yaml.v3" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/server" - - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/server" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) const ( @@ -57,7 +56,7 @@ func New(cfgPath string, server *server.CollectorServer, f HookFunc) *Loader { } } -func (ld *Loader) LoadConfig() (*types.CollectorConfig, error) { +func (ld *Loader) LoadConfig() (*config.CollectorConfig, error) { ld.runHook() @@ -82,7 +81,7 @@ func (ld *Loader) LoadConfig() (*types.CollectorConfig, error) { } }(file) - var cfg types.CollectorConfig + var cfg config.CollectorConfig decoder := yaml.NewDecoder(file) if err := decoder.Decode(&cfg); err != nil { ld.logger.Error(err, "decode config file failed") @@ -92,7 +91,7 @@ func (ld *Loader) LoadConfig() (*types.CollectorConfig, error) { return &cfg, nil } -func (ld *Loader) ValidateConfig(cfg *types.CollectorConfig) error { +func (ld *Loader) ValidateConfig(cfg *config.CollectorConfig) error { if cfg == nil { ld.logger.Sugar().Debug("collector-config-loader is nil") diff --git a/pkg/collector/config/config_test.go b/internal/collector/config/config_test.go similarity index 96% rename from pkg/collector/config/config_test.go rename to internal/collector/config/config_test.go index f905c84..2696b17 100644 --- a/pkg/collector/config/config_test.go +++ b/internal/collector/config/config_test.go @@ -21,7 +21,7 @@ import ( "os" "testing" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/server" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/server" ) func TestLoadConfig(t *testing.T) { diff --git a/pkg/collector/common/dispatcher/exporter/.keep b/internal/collector/extension/kafka/.keep similarity index 100% rename from pkg/collector/common/dispatcher/exporter/.keep rename to internal/collector/extension/kafka/.keep diff --git a/pkg/collector/common/ssh/.keep b/internal/collector/extension/mongodb/.keep similarity index 100% rename from pkg/collector/common/ssh/.keep rename to internal/collector/extension/mongodb/.keep diff --git a/pkg/collector/extension/kafka/.keep b/internal/collector/extension/nebulagraph/.keep similarity index 100% rename from pkg/collector/extension/kafka/.keep rename to internal/collector/extension/nebulagraph/.keep diff --git a/pkg/collector/extension/mongodb/.keep b/internal/collector/extension/rocketmq/.keep similarity index 100% rename from pkg/collector/extension/mongodb/.keep rename to internal/collector/extension/rocketmq/.keep diff --git a/pkg/collector/registry.go b/internal/collector/registry.go similarity index 88% rename from pkg/collector/registry.go rename to internal/collector/registry.go index d4a035f..877deb6 100644 --- a/pkg/collector/registry.go +++ b/internal/collector/registry.go @@ -18,11 +18,11 @@ package collector import ( - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/registry" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/registry" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" // 导入所有采集器以触发自动注册 - _ "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic/database" + _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/database" ) // RegisterBuiltinCollectors 自动注册所有内置采集器 diff --git a/pkg/collector/registry/registry_center.go b/internal/collector/registry/registry_center.go similarity index 97% rename from pkg/collector/registry/registry_center.go rename to internal/collector/registry/registry_center.go index 0cc8a4e..7bf1ae3 100644 --- a/pkg/collector/registry/registry_center.go +++ b/internal/collector/registry/registry_center.go @@ -21,8 +21,8 @@ import ( "fmt" "sync" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/basic" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // CollectorFactory 采集器工厂函数类型 diff --git a/internal/collector/server/server.go b/internal/collector/server/server.go new file mode 100644 index 0000000..5661686 --- /dev/null +++ b/internal/collector/server/server.go @@ -0,0 +1,44 @@ +package server + +import ( + "io" + + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/err" + logger2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +const ( + HertzbeatCollectorGoName = "HertzbeatCollectorGoImpl" +) + +type Server struct { + config *config.CollectorConfig + logger logger.Logger + Name string +} + +func New(cfg *config.CollectorConfig, logOut io.Writer) *Server { + + return &Server{ + config: cfg, + Name: HertzbeatCollectorGoName, + logger: logger.DefaultLogger(logOut, logger2.LogLevelInfo), + } +} + +func (s *Server) Validate() error { + + if s.config.Collector.Info.IP == "" { + s.logger.Error(err.CollectorIPIsNull, "collector server start failed") + return err.CollectorPortIsNull + } + + if s.config.Collector.Info.Port == "" { + s.logger.Error(err.CollectorPortIsNull, "collector server start failed") + return err.CollectorPortIsNull + } + + return nil +} diff --git a/pkg/collector/server/server_test.go b/internal/collector/server/server_test.go similarity index 100% rename from pkg/collector/server/server_test.go rename to internal/collector/server/server_test.go diff --git a/pkg/collector/worker/metrics_collect.go b/internal/collector/worker/metrics_collect.go similarity index 94% rename from pkg/collector/worker/metrics_collect.go rename to internal/collector/worker/metrics_collect.go index bdefc2b..72a5940 100644 --- a/pkg/collector/worker/metrics_collect.go +++ b/internal/collector/worker/metrics_collect.go @@ -21,8 +21,8 @@ import ( "fmt" "time" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // MetricsData represents collected metrics data @@ -35,7 +35,7 @@ type MetricsData struct { Time int64 `json:"time"` Code int `json:"code"` Msg string `json:"msg"` - Fields []jobtypes.Field `json:"fields"` + Fields []job.Field `json:"fields"` Values [][]string `json:"values"` Metadata map[string]string `json:"metadata"` Labels map[string]string `json:"labels"` @@ -43,12 +43,12 @@ type MetricsData struct { // CollectDataDispatcher defines the interface for dispatching collected data type CollectDataDispatcher interface { - DispatchCollectData(timeout *jobtypes.Timeout, metrics *jobtypes.Metrics, data []*MetricsData) error + DispatchCollectData(timeout *job.Timeout, metrics *job.Metrics, data []*MetricsData) error } // CollectService defines the interface for metrics collection service type CollectService interface { - Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRepMetricsData + Collect(metrics *job.Metrics) *job.CollectRepMetricsData } // MetricsDataBuilder helps build metrics data @@ -61,7 +61,7 @@ func NewMetricsDataBuilder() *MetricsDataBuilder { return &MetricsDataBuilder{ data: &MetricsData{ Time: time.Now().UnixMilli(), - Fields: make([]jobtypes.Field, 0), + Fields: make([]job.Field, 0), Values: make([][]string, 0), Metadata: make(map[string]string), Labels: make(map[string]string), @@ -106,7 +106,7 @@ func (b *MetricsDataBuilder) SetMsg(msg string) *MetricsDataBuilder { } // SetFields sets the field definitions -func (b *MetricsDataBuilder) SetFields(fields []jobtypes.Field) *MetricsDataBuilder { +func (b *MetricsDataBuilder) SetFields(fields []job.Field) *MetricsDataBuilder { b.data.Fields = fields return b } @@ -149,13 +149,13 @@ type MetricsCollect struct { tenantID int64 id int64 app string - metrics *jobtypes.Metrics + metrics *job.Metrics metadata map[string]string labels map[string]string annotations map[string]string // Task control - timeout *jobtypes.Timeout + timeout *job.Timeout collectDispatcher CollectDataDispatcher priority int isCyclic bool @@ -173,8 +173,8 @@ type MetricsCollect struct { // NewMetricsCollect creates a new metrics collection task func NewMetricsCollect( - metrics *jobtypes.Metrics, - timeout *jobtypes.Timeout, + metrics *job.Metrics, + timeout *job.Timeout, dispatcher CollectDataDispatcher, collectorIdentity string, collectService CollectService, @@ -307,7 +307,7 @@ func (mc *MetricsCollect) collectStandard(builder *MetricsDataBuilder) []*Metric } // convertToMetricsData converts CollectRepMetricsData to MetricsData -func (mc *MetricsCollect) convertToMetricsData(collectedData *jobtypes.CollectRepMetricsData, builder *MetricsDataBuilder) *MetricsData { +func (mc *MetricsCollect) convertToMetricsData(collectedData *job.CollectRepMetricsData, builder *MetricsDataBuilder) *MetricsData { // Set basic fields from builder (which has ID, App, TenantID, etc.) builder.SetCode(collectedData.Code). SetMsg(collectedData.Msg). diff --git a/pkg/collector/worker/metrics_collect_integration_test.go b/internal/collector/worker/metrics_collect_integration_test.go similarity index 95% rename from pkg/collector/worker/metrics_collect_integration_test.go rename to internal/collector/worker/metrics_collect_integration_test.go index 938370a..0bae568 100644 --- a/pkg/collector/worker/metrics_collect_integration_test.go +++ b/internal/collector/worker/metrics_collect_integration_test.go @@ -21,10 +21,10 @@ package worker // "os" // "testing" // -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector" -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" -// jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types/job" +// "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector" +// "hertzbeat.apache.org/hertzbeat-collector-go/internal/logger" +// "hertzbeat.apache.org/hertzbeat-collector-go/internal/types" +// jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" //) // //// MockCollectDataDispatcher implements CollectDataDispatcher for testing diff --git a/pkg/collector/worker/task_queue.go b/internal/collector/worker/task_queue.go similarity index 99% rename from pkg/collector/worker/task_queue.go rename to internal/collector/worker/task_queue.go index 33b4827..148d1f6 100644 --- a/pkg/collector/worker/task_queue.go +++ b/internal/collector/worker/task_queue.go @@ -23,7 +23,7 @@ import ( "sync" "time" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // TaskQueue manages a priority queue of collection tasks diff --git a/pkg/collector/worker/task_queue_test.go b/internal/collector/worker/task_queue_test.go similarity index 99% rename from pkg/collector/worker/task_queue_test.go rename to internal/collector/worker/task_queue_test.go index 294f910..900371a 100644 --- a/pkg/collector/worker/task_queue_test.go +++ b/internal/collector/worker/task_queue_test.go @@ -22,7 +22,7 @@ package worker // "testing" // "time" // -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" +// "hertzbeat.apache.org/hertzbeat-collector-go/internal/logger" //) // //func TestTaskQueue_BasicOperations(t *testing.T) { diff --git a/pkg/collector/worker/worker_pool.go b/internal/collector/worker/worker_pool.go similarity index 99% rename from pkg/collector/worker/worker_pool.go rename to internal/collector/worker/worker_pool.go index 3417c72..61c1b4e 100644 --- a/pkg/collector/worker/worker_pool.go +++ b/internal/collector/worker/worker_pool.go @@ -25,7 +25,7 @@ import ( "sync/atomic" "time" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // Task represents a task that can be executed by the worker pool diff --git a/pkg/collector/worker/worker_pool_test.go b/internal/collector/worker/worker_pool_test.go similarity index 99% rename from pkg/collector/worker/worker_pool_test.go rename to internal/collector/worker/worker_pool_test.go index ae3c9ba..f7b0a62 100644 --- a/pkg/collector/worker/worker_pool_test.go +++ b/internal/collector/worker/worker_pool_test.go @@ -23,7 +23,7 @@ package worker // "testing" // "time" // -// "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" +// "hertzbeat.apache.org/hertzbeat-collector-go/internal/logger" //) // //// mockTask implements Task interface for testing diff --git a/pkg/constants/const.go b/internal/constants/const.go similarity index 100% rename from pkg/constants/const.go rename to internal/constants/const.go diff --git a/pkg/logger/logger.go b/internal/util/logger/logger.go similarity index 75% rename from pkg/logger/logger.go rename to internal/util/logger/logger.go index bd2bf93..90bdb23 100644 --- a/pkg/logger/logger.go +++ b/internal/util/logger/logger.go @@ -25,20 +25,19 @@ import ( "github.com/go-logr/zapr" "go.uber.org/zap" "go.uber.org/zap/zapcore" - - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" ) type Logger struct { logr.Logger out io.Writer - logging *types.HertzBeatLogging + logging *logger.HertzBeatLogging sugaredLogger *zap.SugaredLogger } -func NewLogger(w io.Writer, logging *types.HertzBeatLogging) Logger { +func NewLogger(w io.Writer, logging *logger.HertzBeatLogging) Logger { - logger := initZapLogger(w, logging, logging.Level[types.LogComponentHertzbeatDefault]) + logger := initZapLogger(w, logging, logging.Level[logger.LogComponentHertzbeatDefault]) return Logger{ Logger: zapr.NewLogger(logger), @@ -48,14 +47,14 @@ func NewLogger(w io.Writer, logging *types.HertzBeatLogging) Logger { } } -func FileLogger(file, name string, level types.LogLevel) Logger { +func FileLogger(file, name string, level logger.LogLevel) Logger { writer, err := os.OpenFile(file, os.O_WRONLY, 0o666) if err != nil { panic(err) } - logging := types.DefaultHertzbeatLogging() + logging := logger.DefaultHertzbeatLogging() logger := initZapLogger(writer, logging, level) return Logger{ @@ -66,9 +65,9 @@ func FileLogger(file, name string, level types.LogLevel) Logger { } } -func DefaultLogger(out io.Writer, level types.LogLevel) Logger { +func DefaultLogger(out io.Writer, level logger.LogLevel) Logger { - logging := types.DefaultHertzbeatLogging() + logging := logger.DefaultHertzbeatLogging() logger := initZapLogger(out, logging, level) return Logger{ @@ -86,7 +85,7 @@ func DefaultLogger(out io.Writer, level types.LogLevel) Logger { // more information). func (l Logger) WithName(name string) Logger { - logLevel := l.logging.Level[types.HertzbeatLogComponent(name)] + logLevel := l.logging.Level[logger.HertzbeatLogComponent(name)] logger := initZapLogger(l.out, l.logging, logLevel) return Logger{ @@ -109,26 +108,26 @@ func (l Logger) WithValues(keysAndValues ...interface{}) Logger { // verbose, API. Any Logger can be converted to a SugaredLogger with its Sugar // method. // -// Unlike the Logger, the SugaredLogger doesn't insist on structured logging. +// Unlike the Logger, the SugaredLogger doesn't insist on structured logger. // For each log level, it exposes four methods: // -// - methods named after the log level for log.Print-style logging -// - methods ending in "w" for loosely-typed structured logging -// - methods ending in "f" for log.Printf-style logging -// - methods ending in "ln" for log.Println-style logging +// - methods named after the log level for log.Print-style logger +// - methods ending in "w" for loosely-typed structured logger +// - methods ending in "f" for log.Printf-style logger +// - methods ending in "ln" for log.Println-style logger // // For example, the methods for InfoLevel are: // -// Info(...any) Print-style logging -// Infow(...any) Structured logging (read as "info with") -// Infof(string, ...any) Printf-style logging -// Infoln(...any) Println-style logging +// Info(...any) Print-style logger +// Infow(...any) Structured logger (read as "info with") +// Infof(string, ...any) Printf-style logger +// Infoln(...any) Println-style logger func (l Logger) Sugar() *zap.SugaredLogger { return l.sugaredLogger } -func initZapLogger(w io.Writer, logging *types.HertzBeatLogging, level types.LogLevel) *zap.Logger { +func initZapLogger(w io.Writer, logging *logger.HertzBeatLogging, level logger.LogLevel) *zap.Logger { parseLevel, _ := zapcore.ParseLevel(string(logging.DefaultHertzBeatLoggingLevel(level))) core := zapcore.NewCore(zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()), zapcore.AddSync(w), zap.NewAtomicLevelAt(parseLevel)) diff --git a/pkg/logger/logger_test.go b/internal/util/logger/logger_test.go similarity index 78% rename from pkg/logger/logger_test.go rename to internal/util/logger/logger_test.go index c1389c9..3a9d5de 100644 --- a/pkg/logger/logger_test.go +++ b/internal/util/logger/logger_test.go @@ -26,8 +26,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zapcore" - - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" ) func TestZapLogLevel(t *testing.T) { @@ -44,17 +43,17 @@ func TestZapLogLevel(t *testing.T) { } func TestLogger(t *testing.T) { - logger := NewLogger(os.Stdout, types.DefaultHertzbeatLogging()) + logger := NewLogger(os.Stdout, logger.DefaultHertzbeatLogging()) logger.Info("kv msg", "key", "value") logger.Sugar().Infof("template %s %d", "string", 123) - logger.WithName(string(types.LogComponentHertzbeatCollector)).WithValues("runner", types.LogComponentHertzbeatCollector).Info("msg", "k", "v") + logger.WithName(string(logger.LogComponentHertzbeatCollector)).WithValues("runner", logger.LogComponentHertzbeatCollector).Info("msg", "k", "v") - defaultLogger := DefaultLogger(os.Stdout, types.LogLevelInfo) + defaultLogger := DefaultLogger(os.Stdout, logger.LogLevelInfo) assert.NotNil(t, defaultLogger.logging) assert.NotNil(t, defaultLogger.sugaredLogger) - fileLogger := FileLogger("/dev/stderr", "fl-test", types.LogLevelInfo) + fileLogger := FileLogger("/dev/stderr", "fl-test", logger.LogLevelInfo) assert.NotNil(t, fileLogger.logging) assert.NotNil(t, fileLogger.sugaredLogger) } @@ -71,10 +70,10 @@ func TestLoggerWithName(t *testing.T) { require.NoError(t, err) }() - config := types.DefaultHertzbeatLogging() - config.Level[types.LogComponentHertzbeatCollector] = types.LogLevelDebug + config := logger.DefaultHertzbeatLogging() + config.Level[logger.LogComponentHertzbeatCollector] = logger.LogLevelDebug - logger := NewLogger(os.Stdout, config).WithName(string(types.LogComponentHertzbeatCollector)) + logger := NewLogger(os.Stdout, config).WithName(string(logger.LogComponentHertzbeatCollector)) logger.Info("info message") logger.Sugar().Debugf("debug message") @@ -83,7 +82,7 @@ func TestLoggerWithName(t *testing.T) { _, err := r.Read(outputBytes) require.NoError(t, err) capturedOutput := string(outputBytes) - assert.Contains(t, capturedOutput, string(types.LogComponentHertzbeatCollector)) + assert.Contains(t, capturedOutput, string(logger.LogComponentHertzbeatCollector)) assert.Contains(t, capturedOutput, "info message") assert.Contains(t, capturedOutput, "debug message") } @@ -102,8 +101,8 @@ func TestLoggerSugarName(t *testing.T) { const logName = "loggerName" - config := types.DefaultHertzbeatLogging() - config.Level[logName] = types.LogLevelDebug + config := logger.DefaultHertzbeatLogging() + config.Level[logName] = logger.LogLevelDebug logger := NewLogger(os.Stdout, config).WithName(logName) diff --git a/internal/util/timer/timer.go b/internal/util/timer/timer.go new file mode 100644 index 0000000..92a31aa --- /dev/null +++ b/internal/util/timer/timer.go @@ -0,0 +1,8 @@ +package timer + +import "time" + +// GetCurrentTimeMillis returns current time in milliseconds +func GetCurrentTimeMillis() int64 { + return time.Now().UnixMilli() +} diff --git a/pkg/collector/bootstrap.go b/pkg/collector/bootstrap.go deleted file mode 100644 index 2e02be1..0000000 --- a/pkg/collector/bootstrap.go +++ /dev/null @@ -1,133 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package collector - -import ( - "context" - "os" - "os/signal" - "syscall" - - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/banner" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" -) - -func Bootstrap(confPath, version string) error { - // Initialize logger - log := logger.DefaultLogger(os.Stdout, types.LogLevelInfo) - log.Info("starting HertzBeat Collector Go", "version", version) - - // Print banner - bannerPrinter := banner.New(&BannerAdapter{logger: log}) - err := bannerPrinter.PrintBannerWithVersion("HertzBeat-Collector-Go", "1159", version) - if err != nil { - log.Error(err, "failed to print banner") - return err - } - - // Create and initialize the collector application - app := NewCollectorApp(confPath, version, log) - - // Setup graceful shutdown - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - go func() { - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - <-sigCh - log.Info("received shutdown signal") - cancel() - }() - - // Start the application - if err := app.Start(); err != nil { - log.Error(err, "failed to start collector application") - return err - } - - // Wait for shutdown signal - <-ctx.Done() - - // Graceful shutdown - log.Info("shutting down collector application") - if err := app.Stop(); err != nil { - log.Error(err, "error during shutdown") - return err - } - - log.Info("HertzBeat Collector Go stopped successfully") - return nil -} - -// CollectorApp wraps the complete collector application -type CollectorApp struct { - confPath string - version string - logger logger.Logger - - // Core components - collectService *CollectService -} - -// NewCollectorApp creates a new collector application -func NewCollectorApp(confPath, version string, logger logger.Logger) *CollectorApp { - return &CollectorApp{ - confPath: confPath, - version: version, - logger: logger.WithName("collector-app"), - } -} - -// Start starts the collector application -func (app *CollectorApp) Start() error { - app.logger.Info("initializing collector application", "version", app.version) - - // Initialize collect service with enhanced scheduling - app.collectService = NewCollectService(app.logger) - - // Register built-in collectors - RegisterBuiltinCollectors(app.collectService, app.logger) - - app.logger.Info("collector application started successfully") - return nil -} - -// Stop stops the collector application -func (app *CollectorApp) Stop() error { - app.logger.Info("stopping collector application") - - // TODO: Implement graceful shutdown of components - - app.logger.Info("collector application stopped successfully") - return nil -} - -// BannerAdapter adapts logger interface for banner printer -type BannerAdapter struct { - logger logger.Logger -} - -func (ba *BannerAdapter) Error(err error, msg string) { - ba.logger.Error(err, msg) -} - -func (ba *BannerAdapter) Info(msg string, keysAndValues ...interface{}) { - ba.logger.Info(msg, keysAndValues...) -} diff --git a/pkg/collector/extension/nebulagraph/.keep b/pkg/collector/extension/nebulagraph/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/pkg/collector/extension/rocketmq/.keep b/pkg/collector/extension/rocketmq/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/pkg/collector/server/server.go b/pkg/collector/server/server.go deleted file mode 100644 index 53af10c..0000000 --- a/pkg/collector/server/server.go +++ /dev/null @@ -1,86 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package server - -import ( - "context" - "os" - - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/job" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/collector/common/transport" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/pkg/types" -) - -const ( - DefaultHertzBeatCollectorVersion = "0.0.1-DEV" -) - -type Run interface { - Start(ctx context.Context) error - Close() error -} - -// CollectorServer HertzBeat Collector Server -type CollectorServer struct { - Version string - Logger logger.Logger - - job *job.Server - transport *transport.Server -} - -func NewCollectorServer(version string) *CollectorServer { - - if version == "" { - version = DefaultHertzBeatCollectorVersion - } - - return &CollectorServer{ - Version: version, - Logger: logger.DefaultLogger(os.Stdout, types.LogLevelDebug), - } -} - -func (s *CollectorServer) Start(ctx context.Context) error { - - s.Logger.Info("hi, starting collector server...") - - // start job server - s.job = job.NewServer(s.Logger.WithName("job")) - - // init and start transport server - s.transport = transport.NewServer(s.Logger.WithName("transport")) - - // Wait until done - <-ctx.Done() - - return nil -} - -func (s *CollectorServer) Validate() error { - - return nil -} - -// Close Shutdown the server hook -func (s *CollectorServer) Close() error { - - s.Logger.Info("collector server shutting down... bye!") - return nil -} diff --git a/pkg/util/.keep b/pkg/util/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/tools/ci-config/labeler.yml b/tools/ci-config/labeler.yml index 25d4f2a..8f45d87 100644 --- a/tools/ci-config/labeler.yml +++ b/tools/ci-config/labeler.yml @@ -41,7 +41,7 @@ - changed-files: - any-glob-to-any-file: - "cmd/**/*" - - "pkg/**/*" + - "internal/**/*" - "api/**/*" "area/example": diff --git a/tools/make/golang.mk b/tools/make/golang.mk index 52d70e2..2a87379 100644 --- a/tools/make/golang.mk +++ b/tools/make/golang.mk @@ -15,6 +15,13 @@ # limitations under the License. # +VERSION_PACKAGE := hertzbeat.apache.org/hertzbeat-collector-go/internal/cmd/version + +GO_LDFLAGS += -X $(VERSION_PACKAGE).hcgVersion=$(shell cat VERSION) \ + -X $(VERSION_PACKAGE).gitCommitID=$(GIT_COMMIT) + +GIT_COMMIT:=$(shell git rev-parse HEAD) + ##@ Golang .PHONY: @@ -33,7 +40,7 @@ dev: ## Golang dev, run main by run. # build build: ## Golang build @version=$$(cat VERSION); \ - mkdir -p bin/ && go build -ldflags "-X main.Version=$(VERSION)" -o ./bin/ ./... + CGO_ENABLED=0 go build -o bin/collector -ldflags "$(GO_LDFLAGS)" cmd/main.go .PHONY: init init: ## install base. For proto compile. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
