This is an automated email from the ASF dual-hosted git repository.

gaoxingcun pushed a commit to branch feature/go-collector-arrow-communication
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git


The following commit(s) were added to 
refs/heads/feature/go-collector-arrow-communication by this push:
     new 5f1f311  add:
5f1f311 is described below

commit 5f1f311b5e28b48df9fc08c44dc476117a2cfc4d
Author: TJxiaobao <[email protected]>
AuthorDate: Tue Oct 14 23:34:07 2025 +0800

    add:
    
    1、Add Apache Arrow serialization for metrics data compatibility
    2、Implement complete result handling and message routing
    3、Fix MonitorID matching between Go collector and Java Manager
    4、Add one-time and cyclic task result aggregation
    5、Integrate message router with job scheduler
---
 go.mod                                             | 11 ++++++
 go.sum                                             | 27 +++++++++++++
 internal/cmd/server.go                             | 17 +++++---
 .../common/collect/dispatch/metrics_collector.go   |  4 +-
 .../common/collect/lazy_message_router.go          |  7 +++-
 .../collector/common/collect/result_handler.go     | 46 +++++++++++++---------
 internal/collector/common/job/job_server.go        |  7 ++--
 internal/collector/common/router/message_router.go |  6 +--
 internal/transport/processors.go                   |  4 +-
 9 files changed, 94 insertions(+), 35 deletions(-)

diff --git a/go.mod b/go.mod
index 9d01f03..88dc66f 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@ module hertzbeat.apache.org/hertzbeat-collector-go
 go 1.24.6
 
 require (
+       github.com/apache/arrow/go/v13 v13.0.0
        github.com/go-logr/logr v1.4.3
        github.com/go-logr/zapr v1.3.0
        github.com/go-sql-driver/mysql v1.9.3
@@ -22,21 +23,31 @@ require (
        github.com/beorn7/perks v1.0.1 // indirect
        github.com/cespare/xxhash/v2 v2.3.0 // indirect
        github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // 
indirect
+       github.com/goccy/go-json v0.10.0 // indirect
        github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // 
indirect
        github.com/golang-sql/sqlexp v0.1.0 // indirect
+       github.com/google/flatbuffers v23.1.21+incompatible // indirect
        github.com/google/uuid v1.6.0 // indirect
        github.com/inconshreveable/mousetrap v1.1.0 // indirect
+       github.com/klauspost/compress v1.18.0 // indirect
+       github.com/klauspost/cpuid/v2 v2.2.3 // indirect
        github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // 
indirect
+       github.com/pierrec/lz4/v4 v4.1.17 // indirect
        github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // 
indirect
        github.com/prometheus/client_model v0.6.2 // indirect
        github.com/prometheus/common v0.65.0 // indirect
        github.com/prometheus/procfs v0.16.1 // indirect
        github.com/rogpeppe/go-internal v1.13.1 // indirect
        github.com/spf13/pflag v1.0.9 // indirect
+       github.com/zeebo/xxh3 v1.0.2 // indirect
        go.uber.org/multierr v1.11.0 // indirect
        golang.org/x/crypto v0.39.0 // indirect
+       golang.org/x/mod v0.25.0 // indirect
        golang.org/x/net v0.41.0 // indirect
+       golang.org/x/sync v0.15.0 // indirect
        golang.org/x/sys v0.33.0 // indirect
        golang.org/x/text v0.26.0 // indirect
+       golang.org/x/tools v0.33.0 // indirect
+       golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
        google.golang.org/genproto/googleapis/rpc 
v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
 )
diff --git a/go.sum b/go.sum
index 333de92..1e8a909 100644
--- a/go.sum
+++ b/go.sum
@@ -12,6 +12,8 @@ 
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.1.1 h1:bFWuo
 github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.1.1/go.mod 
h1:Vih/3yc6yac2JzU4hzpaDupBJP0Flaia9rXXrU8xyww=
 github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 
h1:oygO0locgZJe7PpYPXT5A29ZkwJaPqcva7BVeemZOZs=
 github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2/go.mod 
h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
+github.com/apache/arrow/go/v13 v13.0.0 
h1:kELrvDQuKZo8csdWYqBQfyi431x6Zs/YJTEgUuSVcWk=
+github.com/apache/arrow/go/v13 v13.0.0/go.mod 
h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
 github.com/beorn7/perks v1.0.1/go.mod 
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
 github.com/cespare/xxhash/v2 v2.3.0 
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
@@ -27,6 +29,8 @@ github.com/go-logr/zapr v1.3.0 
h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ=
 github.com/go-logr/zapr v1.3.0/go.mod 
h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg=
 github.com/go-sql-driver/mysql v1.9.3 
h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo=
 github.com/go-sql-driver/mysql v1.9.3/go.mod 
h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU=
+github.com/goccy/go-json v0.10.0 
h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
+github.com/goccy/go-json v0.10.0/go.mod 
h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
 github.com/golang-jwt/jwt/v5 v5.2.2 
h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
 github.com/golang-jwt/jwt/v5 v5.2.2/go.mod 
h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
 github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 
h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA=
@@ -35,12 +39,18 @@ github.com/golang-sql/sqlexp v0.1.0 
h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei
 github.com/golang-sql/sqlexp v0.1.0/go.mod 
h1:J4ad9Vo8ZCWQ2GMrC4UCQy1JpCbwU9m3EOqtpKwwwHI=
 github.com/golang/protobuf v1.5.4 
h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
 github.com/golang/protobuf v1.5.4/go.mod 
h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
+github.com/google/flatbuffers v23.1.21+incompatible 
h1:bUqzx/MXCDxuS0hRJL2EfjyZL3uQrPbMocUa8zGqsTA=
+github.com/google/flatbuffers v23.1.21+incompatible/go.mod 
h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
 github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
 github.com/google/go-cmp v0.7.0/go.mod 
h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
 github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
 github.com/google/uuid v1.6.0/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/inconshreveable/mousetrap v1.1.0 
h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
 github.com/inconshreveable/mousetrap v1.1.0/go.mod 
h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
+github.com/klauspost/compress v1.18.0 
h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
+github.com/klauspost/compress v1.18.0/go.mod 
h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
+github.com/klauspost/cpuid/v2 v2.2.3 
h1:sxCkb+qR91z4vsqw4vGGZlDgPz3G7gjaLyK3V8y70BU=
+github.com/klauspost/cpuid/v2 v2.2.3/go.mod 
h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
 github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
 github.com/kr/pretty v0.3.1/go.mod 
h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -53,6 +63,8 @@ github.com/microsoft/go-mssqldb v1.9.3 
h1:hy4p+LDC8LIGvI3JATnLVmBOLMJbmn5X400mr5
 github.com/microsoft/go-mssqldb v1.9.3/go.mod 
h1:GBbW9ASTiDC+mpgWDGKdm3FnFLTUsLYN3iFL90lQ+PA=
 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 
h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod 
h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
+github.com/pierrec/lz4/v4 v4.1.17 
h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
+github.com/pierrec/lz4/v4 v4.1.17/go.mod 
h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
 github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c 
h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
 github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod 
h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 
h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
@@ -74,6 +86,10 @@ github.com/spf13/pflag v1.0.9 
h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY=
 github.com/spf13/pflag v1.0.9/go.mod 
h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
 github.com/stretchr/testify v1.10.0 
h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
 github.com/stretchr/testify v1.10.0/go.mod 
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
+github.com/zeebo/assert v1.3.0/go.mod 
h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
+github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
+github.com/zeebo/xxh3 v1.0.2/go.mod 
h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
 go.opentelemetry.io/auto/sdk v1.1.0 
h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
 go.opentelemetry.io/auto/sdk v1.1.0/go.mod 
h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
 go.opentelemetry.io/otel v1.37.0 
h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
@@ -94,12 +110,23 @@ go.uber.org/zap v1.27.0 
h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
 go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
 golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
 golang.org/x/crypto v0.39.0/go.mod 
h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
+golang.org/x/exp v0.0.0-20230206171751-46f607a40771 
h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg=
+golang.org/x/exp v0.0.0-20230206171751-46f607a40771/go.mod 
h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
+golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w=
+golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww=
 golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
 golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
+golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
+golang.org/x/sync v0.15.0/go.mod 
h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
+golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
 golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
 golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
 golang.org/x/text v0.26.0/go.mod 
h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
+golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc=
+golang.org/x/tools v0.33.0/go.mod 
h1:CIJMaWEY88juyUfo7UbgPqbC8rU2OqfAV1h2Qp0oMYI=
+golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 
h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
+golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod 
h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
 gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
 gonum.org/v1/gonum v0.16.0/go.mod 
h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
 google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 
h1:pFyd6EwwL2TqFf8emdthzeX+gZE1ElRq3iM8pui4KBY=
diff --git a/internal/cmd/server.go b/internal/cmd/server.go
index 1e98ceb..687f52a 100644
--- a/internal/cmd/server.go
+++ b/internal/cmd/server.go
@@ -30,6 +30,7 @@ import (
        "github.com/spf13/cobra"
 
        bannerouter 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/banner"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
        jobserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job"
        clrserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
        transportserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport"
@@ -108,14 +109,20 @@ func server(ctx context.Context, logOut io.Writer) error {
 }
 
 func startRunners(ctx context.Context, cfg *clrserver.Server) error {
-       // Create job server first
+       // Create transport server first
+       transportRunner := transportserver.NewFromConfig(cfg.Config)
+
+       // Create lazy message router that can get transport client when needed
+       messageRouter := collect.NewLazyMessageRouter(transportRunner, 
cfg.Logger, cfg.Config.Collector.Identity)
+
+       // Create job server with message router
        jobRunner := jobserver.New(&jobserver.Config{
-               Server: *cfg,
+               Server:        *cfg,
+               MessageRouter: messageRouter,
        })
 
-       // Create transport server and connect it to job server
-       transportRunner := transportserver.NewFromConfig(cfg.Config)
-       transportRunner.SetJobScheduler(jobRunner) // Connect transport to job 
scheduler
+       // Connect transport to job scheduler
+       transportRunner.SetJobScheduler(jobRunner)
 
        runners := []struct {
                runner Runner[collectortypes.Info]
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go 
b/internal/collector/common/collect/dispatch/metrics_collector.go
index bc3ea2e..0795a8a 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -77,7 +77,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics 
*jobtypes.Metrics, job *jobty
 
                // Enrich result with job information
                if result != nil {
-                       result.ID = job.ID
+                       result.ID = job.MonitorID // Use MonitorID for both ID 
and MonitorID fields
                        result.MonitorID = job.MonitorID
                        result.App = job.App
                        result.TenantID = job.TenantID
@@ -127,7 +127,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics 
*jobtypes.Metrics, job *jobty
 // createErrorResponse creates an error response for failed collections
 func (mc *MetricsCollector) createErrorResponse(metrics *jobtypes.Metrics, job 
*jobtypes.Job, code int, message string) *jobtypes.CollectRepMetricsData {
        return &jobtypes.CollectRepMetricsData{
-               ID:        job.ID,
+               ID:        job.MonitorID, // Use MonitorID for both ID and 
MonitorID fields
                MonitorID: job.MonitorID,
                TenantID:  job.TenantID,
                App:       job.App,
diff --git a/internal/collector/common/collect/lazy_message_router.go 
b/internal/collector/common/collect/lazy_message_router.go
index b12137d..caff0ec 100644
--- a/internal/collector/common/collect/lazy_message_router.go
+++ b/internal/collector/common/collect/lazy_message_router.go
@@ -35,6 +35,11 @@ import (
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
+// MessageRouter defines the interface for sending collection results
+type MessageRouter interface {
+       SendResult(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job) 
error
+}
+
 // TransportRunner interface for getting transport client
 type TransportRunner interface {
        GetClient() transport.TransportClient
@@ -68,7 +73,7 @@ func (l *LazyMessageRouter) SendResult(data 
*jobtypes.CollectRepMetricsData, job
        client := l.transportRunner.GetClient()
        if client == nil || !client.IsStarted() {
                l.logger.V(1).Info("transport client not ready, dropping 
result",
-                       "jobID", job.ID,
+                       "jobID", job.MonitorID,
                        "metricsName", data.Metrics,
                        "isCyclic", job.IsCyclic)
                return fmt.Errorf("transport client not ready")
diff --git a/internal/collector/common/collect/result_handler.go 
b/internal/collector/common/collect/result_handler.go
index 22ceb1f..48cc240 100644
--- a/internal/collector/common/collect/result_handler.go
+++ b/internal/collector/common/collect/result_handler.go
@@ -29,8 +29,8 @@ import (
 
 // ResultHandlerImpl implements the ResultHandler interface
 type ResultHandlerImpl struct {
-       logger logger.Logger
-       // TODO: Add data queue or storage interface when needed
+       logger        logger.Logger
+       messageRouter MessageRouter // Message router for sending results back 
to Manager
 }
 
 // ResultHandler interface for handling collection results
@@ -39,9 +39,10 @@ type ResultHandler interface {
 }
 
 // NewResultHandler creates a new result handler
-func NewResultHandler(logger logger.Logger) ResultHandler {
+func NewResultHandler(logger logger.Logger, messageRouter MessageRouter) 
ResultHandler {
        return &ResultHandlerImpl{
-               logger: logger.WithName("result-handler"),
+               logger:        logger.WithName("result-handler"),
+               messageRouter: messageRouter,
        }
 }
 
@@ -58,24 +59,31 @@ func (rh *ResultHandlerImpl) HandleCollectData(data 
*jobtypes.CollectRepMetricsD
                "code", data.Code,
                "valuesCount", len(data.Values))
 
-       // TODO: Implement actual data processing logic
-       // This could include:
-       // 1. Data validation and transformation
-       // 2. Sending to message queue
-       // 3. Storing to database
-       // 4. Triggering alerts based on thresholds
-       // 5. Updating monitoring status
+       // Send collection result back to Manager via MessageRouter
+       if rh.messageRouter != nil {
+               if err := rh.messageRouter.SendResult(data, job); err != nil {
+                       rh.logger.Error(err, "failed to send collection result 
to Manager",
+                               "jobID", job.ID,
+                               "metricsName", data.Metrics)
+                       return fmt.Errorf("failed to send result to Manager: 
%w", err)
+               }
 
-       // Only log failures at INFO level, success at debug level
-       if data.Code == http.StatusOK {
-               rh.logger.V(1).Info("successfully processed collect data",
-                       "metricsName", data.Metrics)
+               // Log successful result sending
+               if data.Code == http.StatusOK {
+                       rh.logger.V(1).Info("successfully sent collection 
result to Manager",
+                               "metricsName", data.Metrics)
+               } else {
+                       rh.logger.Info("sent failed collection result to 
Manager",
+                               "jobID", job.ID,
+                               "metricsName", data.Metrics,
+                               "code", data.Code,
+                               "message", data.Msg)
+               }
        } else {
-               rh.logger.Info("received failed collect data",
+               rh.logger.Error(nil, "messageRouter is nil, cannot send result 
to Manager",
                        "jobID", job.ID,
-                       "metricsName", data.Metrics,
-                       "code", data.Code,
-                       "message", data.Msg)
+                       "metricsName", data.Metrics)
+               return fmt.Errorf("messageRouter is nil")
        }
 
        return nil
diff --git a/internal/collector/common/job/job_server.go 
b/internal/collector/common/job/job_server.go
index 2a222f8..8702830 100644
--- a/internal/collector/common/job/job_server.go
+++ b/internal/collector/common/job/job_server.go
@@ -45,7 +45,8 @@ type TimeDispatcher interface {
 // Config represents job service configuration
 type Config struct {
        clrserver.Server
-       TimeDispatch TimeDispatcher
+       TimeDispatch  TimeDispatcher
+       MessageRouter collect.MessageRouter // Message router for sending 
collection results
 }
 
 // Runner implements the service runner interface
@@ -119,8 +120,8 @@ func (r *Runner) RunningJobs() map[int64]*jobtypes.Job {
 
 // New creates a new job service runner with all components initialized
 func New(srv *Config) *Runner {
-       // Create result handler
-       resultHandler := collect.NewResultHandler(srv.Logger)
+       // Create result handler with message router
+       resultHandler := collect.NewResultHandler(srv.Logger, srv.MessageRouter)
 
        // Create metrics collector
        metricsCollector := dispatch.NewMetricsCollector(srv.Logger)
diff --git a/internal/collector/common/router/message_router.go 
b/internal/collector/common/router/message_router.go
index 262dff7..e3093ee 100644
--- a/internal/collector/common/router/message_router.go
+++ b/internal/collector/common/router/message_router.go
@@ -39,8 +39,8 @@ const (
        MessageTypeIssueCyclicTask          int32 = 4
        MessageTypeDeleteCyclicTask         int32 = 5
        MessageTypeIssueOneTimeTask         int32 = 6
-       MessageTypeResponseCyclicTaskData   int32 = 7
-       MessageTypeResponseOneTimeTaskData  int32 = 8
+       MessageTypeResponseOneTimeTaskData  int32 = 7
+       MessageTypeResponseCyclicTaskData   int32 = 8
        MessageTypeResponseCyclicTaskSdData int32 = 9
 )
 
@@ -245,7 +245,7 @@ func (r *MessageRouterImpl) SendResult(data 
*jobtypes.CollectRepMetricsData, job
        // Create message
        msg := &pb.Message{
                Type:      msgType,
-               Direction: pb.Direction_REQUEST,
+               Direction: pb.Direction_RESPONSE,
                Identity:  r.identity,
                Msg:       dataBytes,
        }
diff --git a/internal/transport/processors.go b/internal/transport/processors.go
index b9df9a3..408a0b5 100644
--- a/internal/transport/processors.go
+++ b/internal/transport/processors.go
@@ -41,8 +41,8 @@ const (
        MessageTypeIssueCyclicTask          int32 = 4
        MessageTypeDeleteCyclicTask         int32 = 5
        MessageTypeIssueOneTimeTask         int32 = 6
-       MessageTypeResponseCyclicTaskData   int32 = 7
-       MessageTypeResponseOneTimeTaskData  int32 = 8
+       MessageTypeResponseOneTimeTaskData  int32 = 7
+       MessageTypeResponseCyclicTaskData   int32 = 8
        MessageTypeResponseCyclicTaskSdData int32 = 9
 )
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to