This is an automated email from the ASF dual-hosted git repository.
liuhongyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to refs/heads/main by this push:
new 4950df6 feat: implement Apache Arrow data communication with Java
Manager (#24)
4950df6 is described below
commit 4950df63c5379b7eda825b3925fa3ac502c81ff1
Author: 铁甲小宝 <[email protected]>
AuthorDate: Wed Oct 15 23:35:50 2025 +0800
feat: implement Apache Arrow data communication with Java Manager (#24)
* add:The first version of the modified schedule
* feat: implement Apache Arrow data communication with Java Manager
1、Add Apache Arrow serialization for metrics data compatibility
2、Implement complete result handling and message routing
3、Fix MonitorID matching between Go collector and Java Manager
4、Add one-time and cyclic task result aggregation
5、Integrate message router with job scheduler
* add:
1、Add Apache Arrow serialization for metrics data compatibility
2、Implement complete result handling and message routing
3、Fix MonitorID matching between Go collector and Java Manager
4、Add one-time and cyclic task result aggregation
5、Integrate message router with job scheduler
---
Makefile | 11 +-
.../docker-compose.yml => docker-compose.yml | 4 +-
go.mod | 11 +
go.sum | 27 ++
internal/cmd/server.go | 17 +-
.../common/collect/dispatch/metrics_collector.go | 4 +-
.../common/collect/lazy_message_router.go | 422 +++++++++++++++++++++
.../collector/common/collect/result_handler.go | 46 ++-
internal/collector/common/job/job_server.go | 7 +-
internal/collector/common/router/message_router.go | 6 +-
internal/transport/processors.go | 4 +-
internal/util/arrow/arrow_serializer.go | 151 ++++++++
tools/docker/hcg/Dockerfile | 35 --
tools/make/image.mk | 58 ---
14 files changed, 668 insertions(+), 135 deletions(-)
diff --git a/Makefile b/Makefile
index 6b33134..cd02530 100644
--- a/Makefile
+++ b/Makefile
@@ -17,12 +17,11 @@
_run:
@$(MAKE) --warn-undefined-variables \
- -f tools/make/common.mk \
- -f tools/make/golang.mk \
- -f tools/make/linter.mk \
- -f tools/make/image.mk \
- -f tools/make/tools.mk \
- $(MAKECMDGOALS)
+ -f tools/make/common.mk \
+ -f tools/make/golang.mk \
+ -f tools/make/linter.mk \
+ -f tools/make/tools.mk \
+ $(MAKECMDGOALS)
.PHONY: _run
diff --git a/tools/docker/docker-compose/docker-compose.yml b/docker-compose.yml
similarity index 93%
rename from tools/docker/docker-compose/docker-compose.yml
rename to docker-compose.yml
index 427b692..7c158b5 100644
--- a/tools/docker/docker-compose/docker-compose.yml
+++ b/docker-compose.yml
@@ -19,11 +19,11 @@ services:
# collector go service
hertzbeat-collector:
- image: hertzbeat-collector-go:latest
+ image: hertzbeat/hertzbeat-collector-go:latest
container_name: hertzbeat-collector-go
restart: on-failure
ports:
- - "8080:8080"
+ - "8090:8090"
networks:
hcg-network:
diff --git a/go.mod b/go.mod
index 9d01f03..88dc66f 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@ module hertzbeat.apache.org/hertzbeat-collector-go
go 1.24.6
require (
+ github.com/apache/arrow/go/v13 v13.0.0
github.com/go-logr/logr v1.4.3
github.com/go-logr/zapr v1.3.0
github.com/go-sql-driver/mysql v1.9.3
@@ -22,21 +23,31 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc //
indirect
+ github.com/goccy/go-json v0.10.0 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 //
indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
+ github.com/google/flatbuffers v23.1.21+incompatible // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
+ github.com/klauspost/compress v1.18.0 // indirect
+ github.com/klauspost/cpuid/v2 v2.2.3 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 //
indirect
+ github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 //
indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.65.0 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/spf13/pflag v1.0.9 // indirect
+ github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.39.0 // indirect
+ golang.org/x/mod v0.25.0 // indirect
golang.org/x/net v0.41.0 // indirect
+ golang.org/x/sync v0.15.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.26.0 // indirect
+ golang.org/x/tools v0.33.0 // indirect
+ golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto/googleapis/rpc
v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
)
diff --git a/go.sum b/go.sum
index 333de92..1e8a909 100644
--- a/go.sum
+++ b/go.sum
@@ -12,6 +12,8 @@
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.1.1 h1:bFWuo
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.1.1/go.mod
h1:Vih/3yc6yac2JzU4hzpaDupBJP0Flaia9rXXrU8xyww=
github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2
h1:oygO0locgZJe7PpYPXT5A29ZkwJaPqcva7BVeemZOZs=
github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2/go.mod
h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
+github.com/apache/arrow/go/v13 v13.0.0
h1:kELrvDQuKZo8csdWYqBQfyi431x6Zs/YJTEgUuSVcWk=
+github.com/apache/arrow/go/v13 v13.0.0/go.mod
h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
@@ -27,6 +29,8 @@ github.com/go-logr/zapr v1.3.0
h1:XGdV8XW8zdwFiwOA2Dryh1gj2KRQyOOoNmBy4EplIcQ=
github.com/go-logr/zapr v1.3.0/go.mod
h1:YKepepNBd1u/oyhd/yQmtjVXmm9uML4IXUgMOwR8/Gg=
github.com/go-sql-driver/mysql v1.9.3
h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo=
github.com/go-sql-driver/mysql v1.9.3/go.mod
h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU=
+github.com/goccy/go-json v0.10.0
h1:mXKd9Qw4NuzShiRlOXKews24ufknHO7gx30lsDyokKA=
+github.com/goccy/go-json v0.10.0/go.mod
h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang-jwt/jwt/v5 v5.2.2
h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod
h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9
h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA=
@@ -35,12 +39,18 @@ github.com/golang-sql/sqlexp v0.1.0
h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei
github.com/golang-sql/sqlexp v0.1.0/go.mod
h1:J4ad9Vo8ZCWQ2GMrC4UCQy1JpCbwU9m3EOqtpKwwwHI=
github.com/golang/protobuf v1.5.4
h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod
h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
+github.com/google/flatbuffers v23.1.21+incompatible
h1:bUqzx/MXCDxuS0hRJL2EfjyZL3uQrPbMocUa8zGqsTA=
+github.com/google/flatbuffers v23.1.21+incompatible/go.mod
h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod
h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/inconshreveable/mousetrap v1.1.0
h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod
h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
+github.com/klauspost/compress v1.18.0
h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
+github.com/klauspost/compress v1.18.0/go.mod
h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
+github.com/klauspost/cpuid/v2 v2.2.3
h1:sxCkb+qR91z4vsqw4vGGZlDgPz3G7gjaLyK3V8y70BU=
+github.com/klauspost/cpuid/v2 v2.2.3/go.mod
h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod
h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -53,6 +63,8 @@ github.com/microsoft/go-mssqldb v1.9.3
h1:hy4p+LDC8LIGvI3JATnLVmBOLMJbmn5X400mr5
github.com/microsoft/go-mssqldb v1.9.3/go.mod
h1:GBbW9ASTiDC+mpgWDGKdm3FnFLTUsLYN3iFL90lQ+PA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod
h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
+github.com/pierrec/lz4/v4 v4.1.17
h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
+github.com/pierrec/lz4/v4 v4.1.17/go.mod
h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c
h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod
h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2
h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
@@ -74,6 +86,10 @@ github.com/spf13/pflag v1.0.9
h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY=
github.com/spf13/pflag v1.0.9/go.mod
h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/testify v1.10.0
h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
+github.com/zeebo/assert v1.3.0/go.mod
h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
+github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
+github.com/zeebo/xxh3 v1.0.2/go.mod
h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.opentelemetry.io/auto/sdk v1.1.0
h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod
h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.37.0
h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
@@ -94,12 +110,23 @@ go.uber.org/zap v1.27.0
h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
golang.org/x/crypto v0.39.0/go.mod
h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
+golang.org/x/exp v0.0.0-20230206171751-46f607a40771
h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg=
+golang.org/x/exp v0.0.0-20230206171751-46f607a40771/go.mod
h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
+golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w=
+golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww=
golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
+golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
+golang.org/x/sync v0.15.0/go.mod
h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
+golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
golang.org/x/text v0.26.0/go.mod
h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
+golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc=
+golang.org/x/tools v0.33.0/go.mod
h1:CIJMaWEY88juyUfo7UbgPqbC8rU2OqfAV1h2Qp0oMYI=
+golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2
h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
+golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod
h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod
h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7
h1:pFyd6EwwL2TqFf8emdthzeX+gZE1ElRq3iM8pui4KBY=
diff --git a/internal/cmd/server.go b/internal/cmd/server.go
index 1e98ceb..687f52a 100644
--- a/internal/cmd/server.go
+++ b/internal/cmd/server.go
@@ -30,6 +30,7 @@ import (
"github.com/spf13/cobra"
bannerouter
"hertzbeat.apache.org/hertzbeat-collector-go/internal/banner"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
jobserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job"
clrserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
transportserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport"
@@ -108,14 +109,20 @@ func server(ctx context.Context, logOut io.Writer) error {
}
func startRunners(ctx context.Context, cfg *clrserver.Server) error {
- // Create job server first
+ // Create transport server first
+ transportRunner := transportserver.NewFromConfig(cfg.Config)
+
+ // Create lazy message router that can get transport client when needed
+ messageRouter := collect.NewLazyMessageRouter(transportRunner,
cfg.Logger, cfg.Config.Collector.Identity)
+
+ // Create job server with message router
jobRunner := jobserver.New(&jobserver.Config{
- Server: *cfg,
+ Server: *cfg,
+ MessageRouter: messageRouter,
})
- // Create transport server and connect it to job server
- transportRunner := transportserver.NewFromConfig(cfg.Config)
- transportRunner.SetJobScheduler(jobRunner) // Connect transport to job
scheduler
+ // Connect transport to job scheduler
+ transportRunner.SetJobScheduler(jobRunner)
runners := []struct {
runner Runner[collectortypes.Info]
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go
b/internal/collector/common/collect/dispatch/metrics_collector.go
index bc3ea2e..0795a8a 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -77,7 +77,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics
*jobtypes.Metrics, job *jobty
// Enrich result with job information
if result != nil {
- result.ID = job.ID
+ result.ID = job.MonitorID // Use MonitorID for both ID
and MonitorID fields
result.MonitorID = job.MonitorID
result.App = job.App
result.TenantID = job.TenantID
@@ -127,7 +127,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics
*jobtypes.Metrics, job *jobty
// createErrorResponse creates an error response for failed collections
func (mc *MetricsCollector) createErrorResponse(metrics *jobtypes.Metrics, job
*jobtypes.Job, code int, message string) *jobtypes.CollectRepMetricsData {
return &jobtypes.CollectRepMetricsData{
- ID: job.ID,
+ ID: job.MonitorID, // Use MonitorID for both ID and
MonitorID fields
MonitorID: job.MonitorID,
TenantID: job.TenantID,
App: job.App,
diff --git a/internal/collector/common/collect/lazy_message_router.go
b/internal/collector/common/collect/lazy_message_router.go
new file mode 100644
index 0000000..caff0ec
--- /dev/null
+++ b/internal/collector/common/collect/lazy_message_router.go
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package collect
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+
+ "github.com/apache/arrow/go/v13/arrow"
+ "github.com/apache/arrow/go/v13/arrow/array"
+ "github.com/apache/arrow/go/v13/arrow/ipc"
+ "github.com/apache/arrow/go/v13/arrow/memory"
+
+ pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// MessageRouter defines the interface for sending collection results
+type MessageRouter interface {
+ SendResult(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job)
error
+}
+
+// TransportRunner interface for getting transport client
+type TransportRunner interface {
+ GetClient() transport.TransportClient
+ IsConnected() bool
+}
+
+// LazyMessageRouter is a message router that lazily obtains transport client
+// Solves the startup order dependency between transport client and job runner
+type LazyMessageRouter struct {
+ transportRunner TransportRunner
+ logger logger.Logger
+ identity string
+}
+
+// NewLazyMessageRouter creates a new lazy message router
+func NewLazyMessageRouter(transportRunner TransportRunner, logger
logger.Logger, identity string) MessageRouter {
+ if identity == "" {
+ identity = "collector-go" // Default identity
+ }
+
+ return &LazyMessageRouter{
+ transportRunner: transportRunner,
+ logger: logger.WithName("lazy-message-router"),
+ identity: identity,
+ }
+}
+
+// SendResult implements MessageRouter interface
+func (l *LazyMessageRouter) SendResult(data *jobtypes.CollectRepMetricsData,
job *jobtypes.Job) error {
+ // Get transport client
+ client := l.transportRunner.GetClient()
+ if client == nil || !client.IsStarted() {
+ l.logger.V(1).Info("transport client not ready, dropping
result",
+ "jobID", job.MonitorID,
+ "metricsName", data.Metrics,
+ "isCyclic", job.IsCyclic)
+ return fmt.Errorf("transport client not ready")
+ }
+
+ // Send result directly
+ return l.sendResultDirectly(data, job, client)
+}
+
+// sendResultDirectly sends result directly to manager
+func (l *LazyMessageRouter) sendResultDirectly(data
*jobtypes.CollectRepMetricsData, job *jobtypes.Job, client
transport.TransportClient) error {
+ // Determine message type
+ var msgType pb.MessageType
+ if job.IsCyclic {
+ msgType = pb.MessageType_RESPONSE_CYCLIC_TASK_DATA
+ } else {
+ msgType = pb.MessageType_RESPONSE_ONE_TIME_TASK_DATA
+ }
+
+ // Serialize data to Arrow format
+ dataBytes, err :=
l.serializeToArrow([]*jobtypes.CollectRepMetricsData{data})
+ if err != nil {
+ l.logger.Error(err, "failed to create arrow format",
+ "jobID", job.ID,
+ "metricsName", data.Metrics)
+ return fmt.Errorf("failed to create arrow format: %w", err)
+ }
+
+ // Create message - collector sends data to Manager as RESPONSE
(response to task)
+ msg := &pb.Message{
+ Type: msgType,
+ Direction: pb.Direction_RESPONSE,
+ Identity: l.identity,
+ Msg: dataBytes,
+ }
+
+ // Send message
+ if err := client.SendMsg(msg); err != nil {
+ l.logger.Error(err, "failed to send metrics data",
+ "jobID", job.ID,
+ "metricsName", data.Metrics,
+ "messageType", msgType)
+ return fmt.Errorf("failed to send metrics data: %w", err)
+ }
+
+ l.logger.Info("successfully sent metrics data",
+ "jobID", job.ID,
+ "metricsName", data.Metrics,
+ "isCyclic", job.IsCyclic,
+ "messageType", msgType,
+ "dataSize", len(dataBytes),
+ "direction", msg.Direction,
+ "identity", msg.Identity)
+
+ // Add detailed debugging information
+ l.logger.Info("message details for debugging",
+ "msgType", int(msgType),
+ "direction", int(msg.Direction),
+ "identity", msg.Identity,
+ "dataLength", len(dataBytes))
+
+ return nil
+}
+
+// serializeToArrow serializes data using Apache Arrow format, compatible with
Java Manager
+func (l *LazyMessageRouter) serializeToArrow(dataList
[]*jobtypes.CollectRepMetricsData) ([]byte, error) {
+ var mainBuf bytes.Buffer
+
+ // Write root count (format expected by Java Manager)
+ rootCount := int32(len(dataList))
+ if err := binary.Write(&mainBuf, binary.BigEndian, rootCount); err !=
nil {
+ return nil, fmt.Errorf("failed to write root count: %w", err)
+ }
+
+ mem := memory.NewGoAllocator()
+
+ // Create separate Arrow stream for each data item
+ for i, data := range dataList {
+ recordBatch, err := l.createArrowRecordBatch(mem, data)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create record batch
for data %d: %w", i, err)
+ }
+
+ // Create Arrow stream
+ var streamBuf bytes.Buffer
+ writer := ipc.NewWriter(&streamBuf,
ipc.WithSchema(recordBatch.Schema()))
+ if err := writer.Write(recordBatch); err != nil {
+ recordBatch.Release()
+ writer.Close()
+ return nil, fmt.Errorf("failed to write record batch
%d: %w", i, err)
+ }
+ if err := writer.Close(); err != nil {
+ recordBatch.Release()
+ return nil, fmt.Errorf("failed to close writer for
batch %d: %w", i, err)
+ }
+ recordBatch.Release()
+
+ // Write stream data to main buffer
+ streamData := streamBuf.Bytes()
+ if _, err := mainBuf.Write(streamData); err != nil {
+ return nil, fmt.Errorf("failed to write stream data for
batch %d: %w", i, err)
+ }
+ }
+
+ return mainBuf.Bytes(), nil
+}
+
+// createArrowRecordBatch creates Arrow RecordBatch for single MetricsData,
compatible with Java Manager
+func (l *LazyMessageRouter) createArrowRecordBatch(mem memory.Allocator, data
*jobtypes.CollectRepMetricsData) (arrow.Record, error) {
+ // Create metadata for fields (all fields use the same default metadata)
+ emptyMetadata := arrow.MetadataFrom(map[string]string{
+ "type": "1",
+ "label": "false",
+ "unit": "none",
+ })
+
+ // Define metadata fields (fixed fields)
+ metadataFields := []arrow.Field{
+ {Name: "app", Type: arrow.BinaryTypes.String, Nullable: true,
Metadata: emptyMetadata},
+ {Name: "metrics", Type: arrow.BinaryTypes.String, Nullable:
true, Metadata: emptyMetadata},
+ {Name: "id", Type: arrow.BinaryTypes.String, Nullable: true,
Metadata: emptyMetadata},
+ {Name: "monitorId", Type: arrow.BinaryTypes.String, Nullable:
true, Metadata: emptyMetadata},
+ {Name: "tenantId", Type: arrow.BinaryTypes.String, Nullable:
true, Metadata: emptyMetadata},
+ {Name: "priority", Type: arrow.BinaryTypes.String, Nullable:
true, Metadata: emptyMetadata},
+ {Name: "time", Type: arrow.BinaryTypes.String, Nullable: true,
Metadata: emptyMetadata},
+ {Name: "code", Type: arrow.BinaryTypes.String, Nullable: true,
Metadata: emptyMetadata},
+ {Name: "msg", Type: arrow.BinaryTypes.String, Nullable: true,
Metadata: emptyMetadata},
+ }
+
+ // Add dynamic fields (based on collected field definitions)
+ dynamicFields := make([]arrow.Field, 0, len(data.Fields))
+ for _, field := range data.Fields {
+ // Ensure unit is not empty
+ unitValue := field.Unit
+ if unitValue == "" {
+ unitValue = "none"
+ }
+
+ // Create field metadata
+ typeValue := fmt.Sprintf("%d", field.Type)
+ labelValue := fmt.Sprintf("%t", field.Label)
+
+ fieldMetadata := arrow.MetadataFrom(map[string]string{
+ "type": typeValue,
+ "label": labelValue,
+ "unit": unitValue,
+ })
+
+ dynamicFields = append(dynamicFields, arrow.Field{
+ Name: field.Field,
+ Type: arrow.BinaryTypes.String,
+ Nullable: true,
+ Metadata: fieldMetadata,
+ })
+ }
+
+ // Merge all fields
+ allFields := make([]arrow.Field, 0,
len(metadataFields)+len(dynamicFields))
+ allFields = append(allFields, metadataFields...)
+ allFields = append(allFields, dynamicFields...)
+
+ // Create schema-level metadata
+ schemaMetadata := arrow.MetadataFrom(map[string]string{
+ "id": fmt.Sprintf("%d", data.ID),
+ "tenantId": fmt.Sprintf("%d", data.TenantID),
+ "app": data.App,
+ "metrics": data.Metrics,
+ "priority": fmt.Sprintf("%d", data.Priority),
+ "time": fmt.Sprintf("%d", data.Time),
+ "labels": "",
+ "annotations": "",
+ })
+
+ schema := arrow.NewSchema(allFields, &schemaMetadata)
+
+ // Create builders (all fields are String type)
+ builders := make([]array.Builder, len(allFields))
+ for i, field := range allFields {
+ builders[i] = array.NewBuilder(mem, field.Type)
+ }
+ defer func() {
+ for _, builder := range builders {
+ builder.Release()
+ }
+ }()
+
+ // Determine row count
+ rowCount := len(data.Values)
+ if rowCount == 0 {
+ rowCount = 1 // At least one row of metadata
+ }
+
+ // Fill data
+ for rowIdx := 0; rowIdx < rowCount; rowIdx++ {
+ // Fill metadata fields
+ builders[0].(*array.StringBuilder).Append(data.App)
+ builders[1].(*array.StringBuilder).Append(data.Metrics)
+ builders[2].(*array.StringBuilder).Append(fmt.Sprintf("%d",
data.ID))
+ builders[3].(*array.StringBuilder).Append(fmt.Sprintf("%d",
data.MonitorID))
+ builders[4].(*array.StringBuilder).Append(fmt.Sprintf("%d",
data.TenantID))
+ builders[5].(*array.StringBuilder).Append(fmt.Sprintf("%d",
data.Priority))
+ builders[6].(*array.StringBuilder).Append(fmt.Sprintf("%d",
data.Time))
+ builders[7].(*array.StringBuilder).Append(fmt.Sprintf("%d",
data.Code))
+ builders[8].(*array.StringBuilder).Append(data.Msg)
+
+ // Fill dynamic field data
+ if rowIdx < len(data.Values) {
+ valueRow := data.Values[rowIdx]
+ for i, _ := range data.Fields {
+ builderIdx := len(metadataFields) + i
+ var value string
+ if i < len(valueRow.Columns) {
+ value = valueRow.Columns[i]
+ }
+
builders[builderIdx].(*array.StringBuilder).Append(value)
+ }
+ } else {
+ // Fill empty values
+ for i := range data.Fields {
+ builderIdx := len(metadataFields) + i
+
builders[builderIdx].(*array.StringBuilder).Append("")
+ }
+ }
+ }
+
+ // Build arrays
+ arrays := make([]arrow.Array, len(builders))
+ for i, builder := range builders {
+ arrays[i] = builder.NewArray()
+ defer arrays[i].Release()
+ }
+
+ // Create Record
+ record := array.NewRecord(schema, arrays, int64(rowCount))
+ return record, nil
+}
+
+// createUnifiedArrowRecordBatch creates a unified RecordBatch containing all
MetricsData
+func (l *LazyMessageRouter) createUnifiedArrowRecordBatch(mem
memory.Allocator, dataList []*jobtypes.CollectRepMetricsData) (arrow.Record,
error) {
+ if len(dataList) == 0 {
+ return nil, fmt.Errorf("empty data list")
+ }
+
+ // Use the first data item to define schema
+ firstData := dataList[0]
+
+ // Define basic fields - simplified schema, only core fields
+ fields := []arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64},
+ {Name: "monitorId", Type: arrow.PrimitiveTypes.Int64},
+ {Name: "app", Type: arrow.BinaryTypes.String},
+ {Name: "metrics", Type: arrow.BinaryTypes.String},
+ {Name: "time", Type: arrow.PrimitiveTypes.Int64},
+ {Name: "code", Type: arrow.PrimitiveTypes.Int32},
+ {Name: "msg", Type: arrow.BinaryTypes.String},
+ }
+
+ // Add value fields - based on first data item's Fields
+ if len(firstData.Fields) > 0 {
+ for _, field := range firstData.Fields {
+ fields = append(fields, arrow.Field{
+ Name: field.Field,
+ Type: arrow.BinaryTypes.String,
+ })
+ }
+ }
+
+ schema := arrow.NewSchema(fields, nil)
+
+ // Create builders
+ builders := make([]array.Builder, len(fields))
+ for i, field := range fields {
+ switch field.Type {
+ case arrow.PrimitiveTypes.Int64:
+ builders[i] = array.NewInt64Builder(mem)
+ case arrow.PrimitiveTypes.Int32:
+ builders[i] = array.NewInt32Builder(mem)
+ default:
+ builders[i] = array.NewStringBuilder(mem)
+ }
+ }
+
+ // Calculate total rows
+ totalRows := 0
+ for _, data := range dataList {
+ if len(data.Values) > 0 {
+ totalRows += len(data.Values)
+ } else {
+ totalRows += 1 // At least one row of metadata
+ }
+ }
+
+ // Fill data
+ for _, data := range dataList {
+ rowCount := 1
+ if len(data.Values) > 0 {
+ rowCount = len(data.Values)
+ }
+
+ for row := 0; row < rowCount; row++ {
+ // Basic fields
+ builders[0].(*array.Int64Builder).Append(data.ID)
+ builders[1].(*array.Int64Builder).Append(data.MonitorID)
+ builders[2].(*array.StringBuilder).Append(data.App)
+ builders[3].(*array.StringBuilder).Append(data.Metrics)
+ builders[4].(*array.Int64Builder).Append(data.Time)
+
builders[5].(*array.Int32Builder).Append(int32(data.Code))
+ builders[6].(*array.StringBuilder).Append(data.Msg)
+
+ // Value fields
+ if len(data.Values) > row && len(data.Fields) > 0 {
+ valueRow := data.Values[row]
+ for i := range data.Fields {
+ builderIndex := 7 + i // Index after
basic fields
+ if builderIndex < len(builders) {
+ if i < len(valueRow.Columns) {
+
builders[builderIndex].(*array.StringBuilder).Append(valueRow.Columns[i])
+ } else {
+
builders[builderIndex].(*array.StringBuilder).Append("")
+ }
+ }
+ }
+ } else {
+ // Fill empty values
+ for i := 0; i < len(firstData.Fields); i++ {
+ builderIndex := 7 + i
+ if builderIndex < len(builders) {
+
builders[builderIndex].(*array.StringBuilder).Append("")
+ }
+ }
+ }
+ }
+ }
+
+ // Build arrays
+ arrays := make([]arrow.Array, len(builders))
+ for i, builder := range builders {
+ arrays[i] = builder.NewArray()
+ defer arrays[i].Release()
+ builder.Release()
+ }
+
+ // Create RecordBatch
+ record := array.NewRecord(schema, arrays, int64(totalRows))
+ return record, nil
+}
diff --git a/internal/collector/common/collect/result_handler.go
b/internal/collector/common/collect/result_handler.go
index 22ceb1f..48cc240 100644
--- a/internal/collector/common/collect/result_handler.go
+++ b/internal/collector/common/collect/result_handler.go
@@ -29,8 +29,8 @@ import (
// ResultHandlerImpl implements the ResultHandler interface
type ResultHandlerImpl struct {
- logger logger.Logger
- // TODO: Add data queue or storage interface when needed
+ logger logger.Logger
+ messageRouter MessageRouter // Message router for sending results back
to Manager
}
// ResultHandler interface for handling collection results
@@ -39,9 +39,10 @@ type ResultHandler interface {
}
// NewResultHandler creates a new result handler
-func NewResultHandler(logger logger.Logger) ResultHandler {
+func NewResultHandler(logger logger.Logger, messageRouter MessageRouter)
ResultHandler {
return &ResultHandlerImpl{
- logger: logger.WithName("result-handler"),
+ logger: logger.WithName("result-handler"),
+ messageRouter: messageRouter,
}
}
@@ -58,24 +59,31 @@ func (rh *ResultHandlerImpl) HandleCollectData(data
*jobtypes.CollectRepMetricsD
"code", data.Code,
"valuesCount", len(data.Values))
- // TODO: Implement actual data processing logic
- // This could include:
- // 1. Data validation and transformation
- // 2. Sending to message queue
- // 3. Storing to database
- // 4. Triggering alerts based on thresholds
- // 5. Updating monitoring status
+ // Send collection result back to Manager via MessageRouter
+ if rh.messageRouter != nil {
+ if err := rh.messageRouter.SendResult(data, job); err != nil {
+ rh.logger.Error(err, "failed to send collection result
to Manager",
+ "jobID", job.ID,
+ "metricsName", data.Metrics)
+ return fmt.Errorf("failed to send result to Manager:
%w", err)
+ }
- // Only log failures at INFO level, success at debug level
- if data.Code == http.StatusOK {
- rh.logger.V(1).Info("successfully processed collect data",
- "metricsName", data.Metrics)
+ // Log successful result sending
+ if data.Code == http.StatusOK {
+ rh.logger.V(1).Info("successfully sent collection
result to Manager",
+ "metricsName", data.Metrics)
+ } else {
+ rh.logger.Info("sent failed collection result to
Manager",
+ "jobID", job.ID,
+ "metricsName", data.Metrics,
+ "code", data.Code,
+ "message", data.Msg)
+ }
} else {
- rh.logger.Info("received failed collect data",
+ rh.logger.Error(nil, "messageRouter is nil, cannot send result
to Manager",
"jobID", job.ID,
- "metricsName", data.Metrics,
- "code", data.Code,
- "message", data.Msg)
+ "metricsName", data.Metrics)
+ return fmt.Errorf("messageRouter is nil")
}
return nil
diff --git a/internal/collector/common/job/job_server.go
b/internal/collector/common/job/job_server.go
index 2a222f8..8702830 100644
--- a/internal/collector/common/job/job_server.go
+++ b/internal/collector/common/job/job_server.go
@@ -45,7 +45,8 @@ type TimeDispatcher interface {
// Config represents job service configuration
type Config struct {
clrserver.Server
- TimeDispatch TimeDispatcher
+ TimeDispatch TimeDispatcher
+ MessageRouter collect.MessageRouter // Message router for sending
collection results
}
// Runner implements the service runner interface
@@ -119,8 +120,8 @@ func (r *Runner) RunningJobs() map[int64]*jobtypes.Job {
// New creates a new job service runner with all components initialized
func New(srv *Config) *Runner {
- // Create result handler
- resultHandler := collect.NewResultHandler(srv.Logger)
+ // Create result handler with message router
+ resultHandler := collect.NewResultHandler(srv.Logger, srv.MessageRouter)
// Create metrics collector
metricsCollector := dispatch.NewMetricsCollector(srv.Logger)
diff --git a/internal/collector/common/router/message_router.go
b/internal/collector/common/router/message_router.go
index 262dff7..e3093ee 100644
--- a/internal/collector/common/router/message_router.go
+++ b/internal/collector/common/router/message_router.go
@@ -39,8 +39,8 @@ const (
MessageTypeIssueCyclicTask int32 = 4
MessageTypeDeleteCyclicTask int32 = 5
MessageTypeIssueOneTimeTask int32 = 6
- MessageTypeResponseCyclicTaskData int32 = 7
- MessageTypeResponseOneTimeTaskData int32 = 8
+ MessageTypeResponseOneTimeTaskData int32 = 7
+ MessageTypeResponseCyclicTaskData int32 = 8
MessageTypeResponseCyclicTaskSdData int32 = 9
)
@@ -245,7 +245,7 @@ func (r *MessageRouterImpl) SendResult(data
*jobtypes.CollectRepMetricsData, job
// Create message
msg := &pb.Message{
Type: msgType,
- Direction: pb.Direction_REQUEST,
+ Direction: pb.Direction_RESPONSE,
Identity: r.identity,
Msg: dataBytes,
}
diff --git a/internal/transport/processors.go b/internal/transport/processors.go
index b9df9a3..408a0b5 100644
--- a/internal/transport/processors.go
+++ b/internal/transport/processors.go
@@ -41,8 +41,8 @@ const (
MessageTypeIssueCyclicTask int32 = 4
MessageTypeDeleteCyclicTask int32 = 5
MessageTypeIssueOneTimeTask int32 = 6
- MessageTypeResponseCyclicTaskData int32 = 7
- MessageTypeResponseOneTimeTaskData int32 = 8
+ MessageTypeResponseOneTimeTaskData int32 = 7
+ MessageTypeResponseCyclicTaskData int32 = 8
MessageTypeResponseCyclicTaskSdData int32 = 9
)
diff --git a/internal/util/arrow/arrow_serializer.go
b/internal/util/arrow/arrow_serializer.go
new file mode 100644
index 0000000..1915b43
--- /dev/null
+++ b/internal/util/arrow/arrow_serializer.go
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package arrow
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// ArrowSerializer handles Arrow format serialization for metrics data
+type ArrowSerializer struct {
+ logger logger.Logger
+}
+
+// NewArrowSerializer creates a new Arrow serializer
+func NewArrowSerializer(logger logger.Logger) *ArrowSerializer {
+ return &ArrowSerializer{
+ logger: logger.WithName("arrow-serializer"),
+ }
+}
+
+// SerializeMetricsData serializes metrics data to Arrow format
+// For now, we'll use a compatible format that Manager can handle
+func (as *ArrowSerializer) SerializeMetricsData(dataList
[]*jobtypes.CollectRepMetricsData) ([]byte, error) {
+ if len(dataList) == 0 {
+ return nil, fmt.Errorf("empty data list")
+ }
+
+ // For compatibility with Java Manager, we need to create a format that
+ // ArrowUtil.deserializeMetricsData() can handle
+
+ // First, let's try to create a simple Arrow-compatible format
+ // Since we don't have full Arrow library, we'll create a minimal
implementation
+
+ // Convert to a format similar to what Java expects
+ arrowCompatibleData := make([]map[string]interface{}, len(dataList))
+
+ for i, data := range dataList {
+ // Create Arrow-like structure
+ arrowData := map[string]interface{}{
+ "id": data.ID,
+ "tenantId": data.TenantID,
+ "app": data.App,
+ "metrics": data.Metrics,
+ "priority": data.Priority,
+ "time": data.Time,
+ "code": data.Code,
+ "msg": data.Msg,
+ }
+
+ // Convert Values to Arrow-compatible format
+ if len(data.Values) > 0 {
+ // Create column-based structure like Arrow
+ columns := make(map[string][]interface{})
+
+ for _, valueRow := range data.Values {
+ for i, value := range valueRow.Columns {
+ if i < len(data.Fields) {
+ fieldName :=
data.Fields[i].Field
+ if columns[fieldName] == nil {
+ columns[fieldName] =
make([]interface{}, 0, len(data.Values))
+ }
+ columns[fieldName] =
append(columns[fieldName], value)
+ }
+ }
+ }
+
+ arrowData["columns"] = columns
+ arrowData["rowCount"] = len(data.Values)
+ }
+
+ arrowCompatibleData[i] = arrowData
+ }
+
+ // For now, serialize as JSON but with Arrow-like structure
+ // TODO: Implement proper Arrow serialization when Arrow library is
available
+ jsonBytes, err := json.Marshal(arrowCompatibleData)
+ if err != nil {
+ as.logger.Error(err, "failed to serialize arrow-compatible
data")
+ return nil, fmt.Errorf("failed to serialize arrow-compatible
data: %w", err)
+ }
+
+ // Create a minimal Arrow-like binary format
+ // This is a temporary solution until we have proper Arrow support
+ return as.createArrowLikeBinary(jsonBytes)
+}
+
+// createArrowLikeBinary creates a binary format that might be compatible with
Arrow reader
+func (as *ArrowSerializer) createArrowLikeBinary(jsonData []byte) ([]byte,
error) {
+ // Create a simple binary format that includes:
+ // 1. Magic bytes (Arrow-like)
+ // 2. Schema information
+ // 3. Data
+
+ var buffer bytes.Buffer
+
+ // Write Arrow-like magic bytes
+ // This is a simplified version - real Arrow has specific magic bytes
+ magic := []byte("ARROW1\x00\x00")
+ buffer.Write(magic)
+
+ // Write schema length (placeholder)
+ schemaLen := uint32(0) // No schema for now
+ buffer.Write([]byte{byte(schemaLen), byte(schemaLen >> 8),
byte(schemaLen >> 16), byte(schemaLen >> 24)})
+
+ // Write data length
+ dataLen := uint32(len(jsonData))
+ buffer.Write([]byte{byte(dataLen), byte(dataLen >> 8), byte(dataLen >>
16), byte(dataLen >> 24)})
+
+ // Write the JSON data (as a temporary measure)
+ buffer.Write(jsonData)
+
+ as.logger.V(1).Info("created arrow-like binary format",
+ "originalSize", len(jsonData),
+ "binarySize", buffer.Len())
+
+ return buffer.Bytes(), nil
+}
+
+// FallbackToJSON provides JSON serialization as fallback
+func (as *ArrowSerializer) FallbackToJSON(dataList
[]*jobtypes.CollectRepMetricsData) ([]byte, error) {
+ as.logger.Info("falling back to JSON serialization", "dataCount",
len(dataList))
+
+ jsonBytes, err := json.Marshal(dataList)
+ if err != nil {
+ return nil, fmt.Errorf("failed to serialize to JSON: %w", err)
+ }
+
+ return jsonBytes, nil
+}
diff --git a/tools/docker/hcg/Dockerfile b/tools/docker/hcg/Dockerfile
deleted file mode 100644
index 2bb553d..0000000
--- a/tools/docker/hcg/Dockerfile
+++ /dev/null
@@ -1,35 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-FROM
docker.io/library/busybox@sha256:ab33eacc8251e3807b85bb6dba570e4698c3998eca6f0fc2ccb60575a563ea74
AS builder
-
-# prepare hertzbeat data dir
-RUN mkdir -p /var/hertzbeat
-
-# Use distroless as minimal base image to package the manager binary
-# Refer to https://github.com/GoogleContainerTools/distroless for more details
-FROM
gcr.io/distroless/base-nossl:nonroot@sha256:8981b63f968e829d21351ea9d28cc21127e5f034707f1d8483d2993d9577be0b
-
-COPY --from=builder /var/hertzbeat/ /var/hertzbeat/
-
-# copy binary to image, run make build to generate binary.
-COPY bin /usr/local/bin/
-COPY etc /var/hertzbeat/config/
-
-USER 65532:65532
-
-ENTRYPOINT ["/usr/local/bin/collector", "server", "--config",
"/var/hertzbeat/config/hertzbeat-collector.yaml"]
diff --git a/tools/make/image.mk b/tools/make/image.mk
deleted file mode 100644
index fc37001..0000000
--- a/tools/make/image.mk
+++ /dev/null
@@ -1,58 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# This is a wrapper to build and push docker image
-#
-
-# All make targets related to docker image are defined in this file.
-
-REGISTRY ?= docker.io
-
-TAG ?= $(shell git rev-parse HEAD)
-
-DOCKER := docker
-DOCKER_SUPPORTED_API_VERSION ?= 1.32
-
-IMAGES_DIR ?= $(wildcard tools/docker/hcg)
-
-IMAGES ?= hertzbeat-collector-go
-IMAGE_PLATFORMS ?= amd64 arm64
-
-BUILDX_CONTEXT = hcg-build-tools-builder
-
-##@ Image
-
-# todo: multi-platform build
-
-.PHONY: image-build
-image-build: ## Build docker image
-image-build: IMAGE_PLATFORMS = ${shell uname -m}
-image-build:
- @$(LOG_TARGET)
- make build
- $(DOCKER) buildx create --name $(BUILDX_CONTEXT) --use; \
- $(DOCKER) buildx use $(BUILDX_CONTEXT); \
- $(DOCKER) buildx build --load \
- -t $(REGISTRY)/${IMAGES}:$(TAG) \
- --platform linux/${IMAGE_PLATFORMS} \
- --file $(IMAGES_DIR)/Dockerfile . ; \
- $(DOCKER) buildx rm $(BUILDX_CONTEXT)
-
-.PHONY: image-push
-image-push: ## Push docker image
-image-push:
- @$(LOG_TARGET)
- $(DOCKER) push $(REGISTRY)/$${image}:$(TAG)-$${platform}; \
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]