This is an automated email from the ASF dual-hosted git repository.
gaoxingcun pushed a commit to branch feature/go-collector-arrow-communication
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to
refs/heads/feature/go-collector-arrow-communication by this push:
new 5f1f311 add:
5f1f311 is described below
commit 5f1f311b5e28b48df9fc08c44dc476117a2cfc4d
Author: TJxiaobao <[email protected]>
AuthorDate: Tue Oct 14 23:34:07 2025 +0800
add:
1、Add Apache Arrow serialization for metrics data compatibility
2、Implement complete result handling and message routing
3、Fix MonitorID matching between Go collector and Java Manager
4、Add one-time and cyclic task result aggregation
5、Integrate message router with job scheduler
---
go.mod | 11 ++++++
go.sum | 27 +++++++++++++
internal/cmd/server.go | 17 +++++---
.../common/collect/dispatch/metrics_collector.go | 4 +-
.../common/collect/lazy_message_router.go | 7 +++-
.../collector/common/collect/result_handler.go | 46 +++++++++++++---------
internal/collector/common/job/job_server.go | 7 ++--
internal/collector/common/router/message_router.go | 6 +--
internal/transport/processors.go | 4 +-
9 files changed, 94 insertions(+), 35 deletions(-)
diff --git a/go.mod b/go.mod
index 9d01f03..88dc66f 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@ module hertzbeat.apache.org/hertzbeat-collector-go
go 1.24.6
require (
+ github.com/apache/arrow/go/v13 v13.0.0
github.com/go-logr/logr v1.4.3
github.com/go-logr/zapr v1.3.0
github.com/go-sql-driver/mysql v1.9.3
@@ -22,21 +23,31 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc //
indirect
+ github.com/goccy/go-json v0.10.0 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 //
indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
+ github.com/google/flatbuffers v23.1.21+incompatible // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
+ github.com/klauspost/compress v1.18.0 // indirect
+ github.com/klauspost/cpuid/v2 v2.2.3 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 //
indirect
+ github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 //
indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.65.0 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/spf13/pflag v1.0.9 // indirect
+ github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.39.0 // indirect
+ golang.org/x/mod v0.25.0 // indirect
golang.org/x/net v0.41.0 // indirect
+ golang.org/x/sync v0.15.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.26.0 // indirect
+ golang.org/x/tools v0.33.0 // indirect
+ golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto/googleapis/rpc
v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
)
diff --git a/go.sum b/go.sum
index 333de92..1e8a909 100644
--- a/go.sum
+++ b/go.sum
@@ -12,6 +12,8 @@
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.1.1 h1:bFWuo
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.1.1/go.mod
h1:Vih/3yc6yac2JzU4hzpaDupBJP0Flaia9rXXrU8xyww=
github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2
h1:oygO0locgZJe7PpYPXT5A29ZkwJaPqcva7BVeemZOZs=
github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2/go.mod
h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
+github.com/apache/arrow/go/v13 v13.0.0
h1:kELrvDQuKZo8csdWYqBQfyi431x6Zs/YJTEgUuSVcWk=
+github.com/apache/arrow/go/v13 v13.0.0/go.mod
h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
@@ -27,6 +29,8 @@ github.com/go-logr/zapr v1.3.0
h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ=
github.com/go-logr/zapr v1.3.0/go.mod
h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg=
github.com/go-sql-driver/mysql v1.9.3
h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo=
github.com/go-sql-driver/mysql v1.9.3/go.mod
h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU=
+github.com/goccy/go-json v0.10.0
h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
+github.com/goccy/go-json v0.10.0/go.mod
h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang-jwt/jwt/v5 v5.2.2
h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod
h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9
h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA=
@@ -35,12 +39,18 @@ github.com/golang-sql/sqlexp v0.1.0
h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei
github.com/golang-sql/sqlexp v0.1.0/go.mod
h1:J4ad9Vo8ZCWQ2GMrC4UCQy1JpCbwU9m3EOqtpKwwwHI=
github.com/golang/protobuf v1.5.4
h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod
h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
+github.com/google/flatbuffers v23.1.21+incompatible
h1:bUqzx/MXCDxuS0hRJL2EfjyZL3uQrPbMocUa8zGqsTA=
+github.com/google/flatbuffers v23.1.21+incompatible/go.mod
h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod
h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/inconshreveable/mousetrap v1.1.0
h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod
h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
+github.com/klauspost/compress v1.18.0
h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
+github.com/klauspost/compress v1.18.0/go.mod
h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
+github.com/klauspost/cpuid/v2 v2.2.3
h1:sxCkb+qR91z4vsqw4vGGZlDgPz3G7gjaLyK3V8y70BU=
+github.com/klauspost/cpuid/v2 v2.2.3/go.mod
h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod
h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -53,6 +63,8 @@ github.com/microsoft/go-mssqldb v1.9.3
h1:hy4p+LDC8LIGvI3JATnLVmBOLMJbmn5X400mr5
github.com/microsoft/go-mssqldb v1.9.3/go.mod
h1:GBbW9ASTiDC+mpgWDGKdm3FnFLTUsLYN3iFL90lQ+PA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod
h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
+github.com/pierrec/lz4/v4 v4.1.17
h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
+github.com/pierrec/lz4/v4 v4.1.17/go.mod
h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c
h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod
h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2
h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
@@ -74,6 +86,10 @@ github.com/spf13/pflag v1.0.9
h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY=
github.com/spf13/pflag v1.0.9/go.mod
h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/testify v1.10.0
h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
+github.com/zeebo/assert v1.3.0/go.mod
h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
+github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
+github.com/zeebo/xxh3 v1.0.2/go.mod
h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.opentelemetry.io/auto/sdk v1.1.0
h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod
h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.37.0
h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
@@ -94,12 +110,23 @@ go.uber.org/zap v1.27.0
h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
golang.org/x/crypto v0.39.0/go.mod
h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
+golang.org/x/exp v0.0.0-20230206171751-46f607a40771
h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg=
+golang.org/x/exp v0.0.0-20230206171751-46f607a40771/go.mod
h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
+golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w=
+golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww=
golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
+golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
+golang.org/x/sync v0.15.0/go.mod
h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
+golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
golang.org/x/text v0.26.0/go.mod
h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
+golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc=
+golang.org/x/tools v0.33.0/go.mod
h1:CIJMaWEY88juyUfo7UbgPqbC8rU2OqfAV1h2Qp0oMYI=
+golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2
h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
+golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod
h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod
h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7
h1:pFyd6EwwL2TqFf8emdthzeX+gZE1ElRq3iM8pui4KBY=
diff --git a/internal/cmd/server.go b/internal/cmd/server.go
index 1e98ceb..687f52a 100644
--- a/internal/cmd/server.go
+++ b/internal/cmd/server.go
@@ -30,6 +30,7 @@ import (
"github.com/spf13/cobra"
bannerouter
"hertzbeat.apache.org/hertzbeat-collector-go/internal/banner"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
jobserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job"
clrserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
transportserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport"
@@ -108,14 +109,20 @@ func server(ctx context.Context, logOut io.Writer) error {
}
func startRunners(ctx context.Context, cfg *clrserver.Server) error {
- // Create job server first
+ // Create transport server first
+ transportRunner := transportserver.NewFromConfig(cfg.Config)
+
+ // Create lazy message router that can get transport client when needed
+ messageRouter := collect.NewLazyMessageRouter(transportRunner,
cfg.Logger, cfg.Config.Collector.Identity)
+
+ // Create job server with message router
jobRunner := jobserver.New(&jobserver.Config{
- Server: *cfg,
+ Server: *cfg,
+ MessageRouter: messageRouter,
})
- // Create transport server and connect it to job server
- transportRunner := transportserver.NewFromConfig(cfg.Config)
- transportRunner.SetJobScheduler(jobRunner) // Connect transport to job
scheduler
+ // Connect transport to job scheduler
+ transportRunner.SetJobScheduler(jobRunner)
runners := []struct {
runner Runner[collectortypes.Info]
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go
b/internal/collector/common/collect/dispatch/metrics_collector.go
index bc3ea2e..0795a8a 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -77,7 +77,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics
*jobtypes.Metrics, job *jobty
// Enrich result with job information
if result != nil {
- result.ID = job.ID
+ result.ID = job.MonitorID // Use MonitorID for both ID
and MonitorID fields
result.MonitorID = job.MonitorID
result.App = job.App
result.TenantID = job.TenantID
@@ -127,7 +127,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics
*jobtypes.Metrics, job *jobty
// createErrorResponse creates an error response for failed collections
func (mc *MetricsCollector) createErrorResponse(metrics *jobtypes.Metrics, job
*jobtypes.Job, code int, message string) *jobtypes.CollectRepMetricsData {
return &jobtypes.CollectRepMetricsData{
- ID: job.ID,
+ ID: job.MonitorID, // Use MonitorID for both ID and
MonitorID fields
MonitorID: job.MonitorID,
TenantID: job.TenantID,
App: job.App,
diff --git a/internal/collector/common/collect/lazy_message_router.go
b/internal/collector/common/collect/lazy_message_router.go
index b12137d..caff0ec 100644
--- a/internal/collector/common/collect/lazy_message_router.go
+++ b/internal/collector/common/collect/lazy_message_router.go
@@ -35,6 +35,11 @@ import (
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
+// MessageRouter defines the interface for sending collection results
+type MessageRouter interface {
+ SendResult(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job)
error
+}
+
// TransportRunner interface for getting transport client
type TransportRunner interface {
GetClient() transport.TransportClient
@@ -68,7 +73,7 @@ func (l *LazyMessageRouter) SendResult(data
*jobtypes.CollectRepMetricsData, job
client := l.transportRunner.GetClient()
if client == nil || !client.IsStarted() {
l.logger.V(1).Info("transport client not ready, dropping
result",
- "jobID", job.ID,
+ "jobID", job.MonitorID,
"metricsName", data.Metrics,
"isCyclic", job.IsCyclic)
return fmt.Errorf("transport client not ready")
diff --git a/internal/collector/common/collect/result_handler.go
b/internal/collector/common/collect/result_handler.go
index 22ceb1f..48cc240 100644
--- a/internal/collector/common/collect/result_handler.go
+++ b/internal/collector/common/collect/result_handler.go
@@ -29,8 +29,8 @@ import (
// ResultHandlerImpl implements the ResultHandler interface
type ResultHandlerImpl struct {
- logger logger.Logger
- // TODO: Add data queue or storage interface when needed
+ logger logger.Logger
+ messageRouter MessageRouter // Message router for sending results back
to Manager
}
// ResultHandler interface for handling collection results
@@ -39,9 +39,10 @@ type ResultHandler interface {
}
// NewResultHandler creates a new result handler
-func NewResultHandler(logger logger.Logger) ResultHandler {
+func NewResultHandler(logger logger.Logger, messageRouter MessageRouter)
ResultHandler {
return &ResultHandlerImpl{
- logger: logger.WithName("result-handler"),
+ logger: logger.WithName("result-handler"),
+ messageRouter: messageRouter,
}
}
@@ -58,24 +59,31 @@ func (rh *ResultHandlerImpl) HandleCollectData(data
*jobtypes.CollectRepMetricsD
"code", data.Code,
"valuesCount", len(data.Values))
- // TODO: Implement actual data processing logic
- // This could include:
- // 1. Data validation and transformation
- // 2. Sending to message queue
- // 3. Storing to database
- // 4. Triggering alerts based on thresholds
- // 5. Updating monitoring status
+ // Send collection result back to Manager via MessageRouter
+ if rh.messageRouter != nil {
+ if err := rh.messageRouter.SendResult(data, job); err != nil {
+ rh.logger.Error(err, "failed to send collection result
to Manager",
+ "jobID", job.ID,
+ "metricsName", data.Metrics)
+ return fmt.Errorf("failed to send result to Manager:
%w", err)
+ }
- // Only log failures at INFO level, success at debug level
- if data.Code == http.StatusOK {
- rh.logger.V(1).Info("successfully processed collect data",
- "metricsName", data.Metrics)
+ // Log successful result sending
+ if data.Code == http.StatusOK {
+ rh.logger.V(1).Info("successfully sent collection
result to Manager",
+ "metricsName", data.Metrics)
+ } else {
+ rh.logger.Info("sent failed collection result to
Manager",
+ "jobID", job.ID,
+ "metricsName", data.Metrics,
+ "code", data.Code,
+ "message", data.Msg)
+ }
} else {
- rh.logger.Info("received failed collect data",
+ rh.logger.Error(nil, "messageRouter is nil, cannot send result
to Manager",
"jobID", job.ID,
- "metricsName", data.Metrics,
- "code", data.Code,
- "message", data.Msg)
+ "metricsName", data.Metrics)
+ return fmt.Errorf("messageRouter is nil")
}
return nil
diff --git a/internal/collector/common/job/job_server.go
b/internal/collector/common/job/job_server.go
index 2a222f8..8702830 100644
--- a/internal/collector/common/job/job_server.go
+++ b/internal/collector/common/job/job_server.go
@@ -45,7 +45,8 @@ type TimeDispatcher interface {
// Config represents job service configuration
type Config struct {
clrserver.Server
- TimeDispatch TimeDispatcher
+ TimeDispatch TimeDispatcher
+ MessageRouter collect.MessageRouter // Message router for sending
collection results
}
// Runner implements the service runner interface
@@ -119,8 +120,8 @@ func (r *Runner) RunningJobs() map[int64]*jobtypes.Job {
// New creates a new job service runner with all components initialized
func New(srv *Config) *Runner {
- // Create result handler
- resultHandler := collect.NewResultHandler(srv.Logger)
+ // Create result handler with message router
+ resultHandler := collect.NewResultHandler(srv.Logger, srv.MessageRouter)
// Create metrics collector
metricsCollector := dispatch.NewMetricsCollector(srv.Logger)
diff --git a/internal/collector/common/router/message_router.go
b/internal/collector/common/router/message_router.go
index 262dff7..e3093ee 100644
--- a/internal/collector/common/router/message_router.go
+++ b/internal/collector/common/router/message_router.go
@@ -39,8 +39,8 @@ const (
MessageTypeIssueCyclicTask int32 = 4
MessageTypeDeleteCyclicTask int32 = 5
MessageTypeIssueOneTimeTask int32 = 6
- MessageTypeResponseCyclicTaskData int32 = 7
- MessageTypeResponseOneTimeTaskData int32 = 8
+ MessageTypeResponseOneTimeTaskData int32 = 7
+ MessageTypeResponseCyclicTaskData int32 = 8
MessageTypeResponseCyclicTaskSdData int32 = 9
)
@@ -245,7 +245,7 @@ func (r *MessageRouterImpl) SendResult(data
*jobtypes.CollectRepMetricsData, job
// Create message
msg := &pb.Message{
Type: msgType,
- Direction: pb.Direction_REQUEST,
+ Direction: pb.Direction_RESPONSE,
Identity: r.identity,
Msg: dataBytes,
}
diff --git a/internal/transport/processors.go b/internal/transport/processors.go
index b9df9a3..408a0b5 100644
--- a/internal/transport/processors.go
+++ b/internal/transport/processors.go
@@ -41,8 +41,8 @@ const (
MessageTypeIssueCyclicTask int32 = 4
MessageTypeDeleteCyclicTask int32 = 5
MessageTypeIssueOneTimeTask int32 = 6
- MessageTypeResponseCyclicTaskData int32 = 7
- MessageTypeResponseOneTimeTaskData int32 = 8
+ MessageTypeResponseOneTimeTaskData int32 = 7
+ MessageTypeResponseCyclicTaskData int32 = 8
MessageTypeResponseCyclicTaskSdData int32 = 9
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]