This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push:
new 64e44b2 Support transmit Span Attached Event protocol data. (#121)
64e44b2 is described below
commit 64e44b2f31fc1afd84e341d085f0bb1725e74c09
Author: mrproliu <[email protected]>
AuthorDate: Tue Nov 15 19:40:30 2022 +0800
Support transmit Span Attached Event protocol data. (#121)
---
CHANGES.md | 1 +
.../receiver_grpc-native-tracing-receiver.md | 2 +-
go.mod | 4 +-
go.sum | 11 ++--
plugins/forwarder/grpc/nativetracing/forwarder.go | 76 ++++++++++++++++------
.../grpc/nativetracing/attached_event_service.go | 58 +++++++++++++++++
plugins/receiver/grpc/nativetracing/receiver.go | 13 ++--
7 files changed, 135 insertions(+), 30 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index bf56e8c..889f6c8 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,7 @@ Release Notes.
* Support transmit the OpenTelemetry Metrics protocol.
* Upgrade to GO 1.18.
* Add Docker images for arm64 architecture.
+* Support transmit Span Attached Event protocol data.
#### Bug Fixes
* Fix the missing return data when receive metrics in batch mode.
diff --git a/docs/en/setup/plugins/receiver_grpc-native-tracing-receiver.md
b/docs/en/setup/plugins/receiver_grpc-native-tracing-receiver.md
index 1e01231..34006b5 100755
--- a/docs/en/setup/plugins/receiver_grpc-native-tracing-receiver.md
+++ b/docs/en/setup/plugins/receiver_grpc-native-tracing-receiver.md
@@ -1,6 +1,6 @@
# Receiver/grpc-native-tracing-receiver
## Description
-This is a receiver for SkyWalking native tracing format, which is defined at
https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/Tracing.proto.
+This is a receiver for SkyWalking native tracing and span attached event
format, which is defined at
https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/Tracing.proto.
## Support Forwarders
- [native-tracing-grpc-forwarder](forwarder_native-tracing-grpc-forwarder.md)
## DefaultConfig
diff --git a/go.mod b/go.mod
index 7046877..149e455 100644
--- a/go.mod
+++ b/go.mod
@@ -7,6 +7,7 @@ require (
github.com/enriquebris/goconcurrentqueue v0.6.0
github.com/google/go-cmp v0.5.8
github.com/grandecola/mmap v0.6.0
+ github.com/hashicorp/go-multierror v1.1.1
github.com/prometheus/client_golang v1.13.0
github.com/prometheus/common v0.37.0
github.com/prometheus/prometheus v0.39.1
@@ -21,7 +22,7 @@ require (
google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v3 v3.0.1
k8s.io/apimachinery v0.25.1
- skywalking.apache.org/repo/goapi v0.0.0-20220824100816-9c0fee7e3581
+ skywalking.apache.org/repo/goapi v0.0.0-20221115073815-1d2a7c96c0b1
)
require (
@@ -51,6 +52,7 @@ require (
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 // indirect
+ github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect
diff --git a/go.sum b/go.sum
index cd1e80f..a873fbd 100644
--- a/go.sum
+++ b/go.sum
@@ -284,11 +284,14 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod
h1:vNeuVxBJEsws4ogUvrchl83t
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod
h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/consul/api v1.1.0/go.mod
h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/sdk v0.1.1/go.mod
h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
+github.com/hashicorp/errwrap v1.0.0
h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod
h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod
h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod
h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod
h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod
h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
+github.com/hashicorp/go-multierror v1.1.1
h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
+github.com/hashicorp/go-multierror v1.1.1/go.mod
h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod
h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod
h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod
h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
@@ -604,13 +607,13 @@ golang.org/x/net
v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod
h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM=
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220412020605-290c469a71a5/go.mod
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod
h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod
h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220920203100-d0c6ba3f52d9
h1:asZqf0wXastQr+DudYagQS8uBO8bHKeYD1vbAvGmFL8=
golang.org/x/net v0.0.0-20220920203100-d0c6ba3f52d9/go.mod
h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -710,7 +713,6 @@ golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod
h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -724,6 +726,7 @@ golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod
h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8
h1:h+EGohizhe9XlX18rfpa8k8RAc5XyaeamM+0VHRd4lc=
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@@ -1056,5 +1059,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.3
h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kF
sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod
h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
-skywalking.apache.org/repo/goapi v0.0.0-20220824100816-9c0fee7e3581
h1:Pst4hR7kYpUXauPPZFCcRtt84E/ccsflDoKLODjA2Uo=
-skywalking.apache.org/repo/goapi v0.0.0-20220824100816-9c0fee7e3581/go.mod
h1:ARcOiM3INd/UOU3Naeqkmo8WFS96jiHmSUawLoAXZjU=
+skywalking.apache.org/repo/goapi v0.0.0-20221115073815-1d2a7c96c0b1
h1:AeRofErSIajLM+TNbMi0XXV9DZ4WAz5I5lM5gNTDB74=
+skywalking.apache.org/repo/goapi v0.0.0-20221115073815-1d2a7c96c0b1/go.mod
h1:lxmYWY1uAP5SLVKNymAyDzn7KG6dhPWN+pYHmyt+0vo=
diff --git a/plugins/forwarder/grpc/nativetracing/forwarder.go
b/plugins/forwarder/grpc/nativetracing/forwarder.go
index bd87c43..7ed84a8 100644
--- a/plugins/forwarder/grpc/nativetracing/forwarder.go
+++ b/plugins/forwarder/grpc/nativetracing/forwarder.go
@@ -23,6 +23,8 @@ import (
"io"
"reflect"
+ "github.com/hashicorp/go-multierror"
+
"google.golang.org/grpc"
"github.com/apache/skywalking-satellite/internal/pkg/config"
@@ -30,6 +32,7 @@ import (
"github.com/apache/skywalking-satellite/internal/satellite/event"
server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
+ v3 "skywalking.apache.org/repo/goapi/collect/common/v3"
agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
)
@@ -42,7 +45,13 @@ const (
type Forwarder struct {
config.CommonFields
- tracingClient agent.TraceSegmentReportServiceClient
+ tracingClient agent.TraceSegmentReportServiceClient
+ attachedEventClient agent.SpanAttachedEventReportServiceClient
+}
+
+type streaming interface {
+ SendMsg(m interface{}) error
+ CloseAndRecv() (*v3.Commands, error)
}
func (f *Forwarder) Name() string {
@@ -68,39 +77,68 @@ func (f *Forwarder) Prepare(connection interface{}) error {
f.Name(), reflect.TypeOf(connection).String())
}
f.tracingClient = agent.NewTraceSegmentReportServiceClient(client)
+ f.attachedEventClient =
agent.NewSpanAttachedEventReportServiceClient(client)
return nil
}
func (f *Forwarder) Forward(batch event.BatchEvents) error {
- stream, err := f.tracingClient.Collect(context.Background())
- if err != nil {
- log.Logger.Errorf("open grpc stream error %v", err)
- return err
- }
+ var tracingStream agent.TraceSegmentReportService_CollectClient
+ var spanStream agent.SpanAttachedEventReportService_CollectClient
+
+ defer func() {
+ if err := closeStream(tracingStream, spanStream); err != nil {
+ log.Logger.Errorf("%s close stream error: %v",
f.Name(), err)
+ }
+ }()
+ var err error
+ var stream streaming
+ var streamData *server_grpc.OriginalData
for _, e := range batch {
- data, ok := e.GetData().(*v1.SniffData_Segment)
- if !ok {
+ switch data := e.GetData().(type) {
+ case *v1.SniffData_Segment:
+ if tracingStream == nil {
+ tracingStream, err =
f.tracingClient.Collect(context.Background())
+ if err != nil {
+ log.Logger.Errorf("open grpc stream
error %v", err)
+ return err
+ }
+ }
+ stream = tracingStream
+ streamData = server_grpc.NewOriginalData(data.Segment)
+ case *v1.SniffData_SpanAttachedEvent:
+ if spanStream == nil {
+ spanStream, err =
f.attachedEventClient.Collect(context.Background())
+ if err != nil {
+ log.Logger.Errorf("open grpc stream
error %v", err)
+ return err
+ }
+ }
+ stream = spanStream
+ streamData =
server_grpc.NewOriginalData(data.SpanAttachedEvent)
+ default:
continue
}
- err := stream.SendMsg(server_grpc.NewOriginalData(data.Segment))
+
+ err = stream.SendMsg(streamData)
if err != nil {
log.Logger.Errorf("%s send log data error: %v",
f.Name(), err)
- err = closeStream(stream)
- if err != nil {
- log.Logger.Errorf("%s close stream error: %v",
f.Name(), err)
- }
return err
}
}
- return closeStream(stream)
+ return closeStream(tracingStream, spanStream)
}
-func closeStream(stream agent.TraceSegmentReportService_CollectClient) error {
- _, err := stream.CloseAndRecv()
- if err != nil && err != io.EOF {
- return err
+func closeStream(streams ...streaming) error {
+ var err error
+ for _, s := range streams {
+ if s == nil {
+ continue
+ }
+ if _, e := s.CloseAndRecv(); e != nil && e != io.EOF {
+ err = multierror.Append(err, e)
+ }
}
- return nil
+ return err
}
func (f *Forwarder) ForwardType() v1.SniffType {
diff --git a/plugins/receiver/grpc/nativetracing/attached_event_service.go
b/plugins/receiver/grpc/nativetracing/attached_event_service.go
new file mode 100644
index 0000000..ea0bef8
--- /dev/null
+++ b/plugins/receiver/grpc/nativetracing/attached_event_service.go
@@ -0,0 +1,58 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+ "io"
+ "time"
+
+ "github.com/apache/skywalking-satellite/plugins/server/grpc"
+
+ common "skywalking.apache.org/repo/goapi/collect/common/v3"
+ agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+ v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+type SpanAttachedEventReportService struct {
+ receiveChannel chan *v1.SniffData
+ agent.UnimplementedSpanAttachedEventReportServiceServer
+}
+
+func (s *SpanAttachedEventReportService) Collect(stream
agent.SpanAttachedEventReportService_CollectServer) error {
+ for {
+ recData := grpc.NewOriginalData(nil)
+ err := stream.RecvMsg(recData)
+ if err == io.EOF {
+ return stream.SendAndClose(&common.Commands{})
+ }
+ if err != nil {
+ return err
+ }
+ e := &v1.SniffData{
+ Name: eventName,
+ Timestamp: time.Now().UnixNano() / 1e6,
+ Meta: nil,
+ Type: v1.SniffType_TracingType,
+ Remote: true,
+ Data: &v1.SniffData_SpanAttachedEvent{
+ SpanAttachedEvent: recData.Content,
+ },
+ }
+ s.receiveChannel <- e
+ }
+}
diff --git a/plugins/receiver/grpc/nativetracing/receiver.go
b/plugins/receiver/grpc/nativetracing/receiver.go
index 2231e2e..44ba20b 100644
--- a/plugins/receiver/grpc/nativetracing/receiver.go
+++ b/plugins/receiver/grpc/nativetracing/receiver.go
@@ -37,7 +37,8 @@ const (
type Receiver struct {
config.CommonFields
grpc.CommonGRPCReceiverFields
- service *TraceSegmentReportService
+ traceService *TraceSegmentReportService
+ spanAttachEventService *SpanAttachedEventReportService
}
func (r *Receiver) Name() string {
@@ -49,7 +50,7 @@ func (r *Receiver) ShowName() string {
}
func (r *Receiver) Description() string {
- return "This is a receiver for SkyWalking native tracing format, " +
+ return "This is a receiver for SkyWalking native tracing and span
attached event format, " +
"which is defined at
https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/Tracing.proto."
}
@@ -59,9 +60,11 @@ func (r *Receiver) DefaultConfig() string {
func (r *Receiver) RegisterHandler(server interface{}) {
r.CommonGRPCReceiverFields = *grpc.InitCommonGRPCReceiverFields(server)
- r.service = &TraceSegmentReportService{receiveChannel: r.OutputChannel}
- v3.RegisterTraceSegmentReportServiceServer(r.Server, r.service)
- v3_compat.RegisterTraceSegmentReportServiceServer(r.Server,
&TraceSegmentReportServiceCompat{reportService: r.service})
+ r.traceService = &TraceSegmentReportService{receiveChannel:
r.OutputChannel}
+ r.spanAttachEventService =
&SpanAttachedEventReportService{receiveChannel: r.OutputChannel}
+ v3.RegisterTraceSegmentReportServiceServer(r.Server, r.traceService)
+ v3.RegisterSpanAttachedEventReportServiceServer(r.Server,
r.spanAttachEventService)
+ v3_compat.RegisterTraceSegmentReportServiceServer(r.Server,
&TraceSegmentReportServiceCompat{reportService: r.traceService})
}
func (r *Receiver) RegisterSyncInvoker(_ module.SyncInvoker) {