This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new 64e44b2  Support transmit Span Attached Event protocol data. (#121)
64e44b2 is described below

commit 64e44b2f31fc1afd84e341d085f0bb1725e74c09
Author: mrproliu <[email protected]>
AuthorDate: Tue Nov 15 19:40:30 2022 +0800

    Support transmit Span Attached Event protocol data. (#121)
---
 CHANGES.md                                         |  1 +
 .../receiver_grpc-native-tracing-receiver.md       |  2 +-
 go.mod                                             |  4 +-
 go.sum                                             | 11 ++--
 plugins/forwarder/grpc/nativetracing/forwarder.go  | 76 ++++++++++++++++------
 .../grpc/nativetracing/attached_event_service.go   | 58 +++++++++++++++++
 plugins/receiver/grpc/nativetracing/receiver.go    | 13 ++--
 7 files changed, 135 insertions(+), 30 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index bf56e8c..889f6c8 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,7 @@ Release Notes.
 * Support transmit the OpenTelemetry Metrics protocol.
 * Upgrade to GO 1.18.
 * Add Docker images for arm64 architecture.
+* Support transmit Span Attached Event protocol data.
 
 #### Bug Fixes
 * Fix the missing return data when receive metrics in batch mode.
diff --git a/docs/en/setup/plugins/receiver_grpc-native-tracing-receiver.md 
b/docs/en/setup/plugins/receiver_grpc-native-tracing-receiver.md
index 1e01231..34006b5 100755
--- a/docs/en/setup/plugins/receiver_grpc-native-tracing-receiver.md
+++ b/docs/en/setup/plugins/receiver_grpc-native-tracing-receiver.md
@@ -1,6 +1,6 @@
 # Receiver/grpc-native-tracing-receiver
 ## Description
-This is a receiver for SkyWalking native tracing format, which is defined at 
https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/Tracing.proto.
+This is a receiver for SkyWalking native tracing and span attached event 
format, which is defined at 
https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/Tracing.proto.
 ## Support Forwarders
  - [native-tracing-grpc-forwarder](forwarder_native-tracing-grpc-forwarder.md)
 ## DefaultConfig
diff --git a/go.mod b/go.mod
index 7046877..149e455 100644
--- a/go.mod
+++ b/go.mod
@@ -7,6 +7,7 @@ require (
        github.com/enriquebris/goconcurrentqueue v0.6.0
        github.com/google/go-cmp v0.5.8
        github.com/grandecola/mmap v0.6.0
+       github.com/hashicorp/go-multierror v1.1.1
        github.com/prometheus/client_golang v1.13.0
        github.com/prometheus/common v0.37.0
        github.com/prometheus/prometheus v0.39.1
@@ -21,7 +22,7 @@ require (
        google.golang.org/protobuf v1.28.1
        gopkg.in/yaml.v3 v3.0.1
        k8s.io/apimachinery v0.25.1
-       skywalking.apache.org/repo/goapi v0.0.0-20220824100816-9c0fee7e3581
+       skywalking.apache.org/repo/goapi v0.0.0-20221115073815-1d2a7c96c0b1
 )
 
 require (
@@ -51,6 +52,7 @@ require (
        github.com/google/gnostic v0.5.7-v3refs // indirect
        github.com/google/gofuzz v1.2.0 // indirect
        github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 // indirect
+       github.com/hashicorp/errwrap v1.0.0 // indirect
        github.com/hashicorp/go-uuid v1.0.2 // indirect
        github.com/hashicorp/hcl v1.0.0 // indirect
        github.com/imdario/mergo v0.3.12 // indirect
diff --git a/go.sum b/go.sum
index cd1e80f..a873fbd 100644
--- a/go.sum
+++ b/go.sum
@@ -284,11 +284,14 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod 
h1:vNeuVxBJEsws4ogUvrchl83t
 github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod 
h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
 github.com/hashicorp/consul/api v1.1.0/go.mod 
h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
 github.com/hashicorp/consul/sdk v0.1.1/go.mod 
h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
+github.com/hashicorp/errwrap v1.0.0 
h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
 github.com/hashicorp/errwrap v1.0.0/go.mod 
h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
 github.com/hashicorp/go-cleanhttp v0.5.1/go.mod 
h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
 github.com/hashicorp/go-immutable-radix v1.0.0/go.mod 
h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
 github.com/hashicorp/go-msgpack v0.5.3/go.mod 
h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
 github.com/hashicorp/go-multierror v1.0.0/go.mod 
h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
+github.com/hashicorp/go-multierror v1.1.1 
h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
+github.com/hashicorp/go-multierror v1.1.1/go.mod 
h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
 github.com/hashicorp/go-rootcerts v1.0.0/go.mod 
h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
 github.com/hashicorp/go-sockaddr v1.0.0/go.mod 
h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
 github.com/hashicorp/go-syslog v1.0.0/go.mod 
h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
@@ -604,13 +607,13 @@ golang.org/x/net 
v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
 golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod 
h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM=
 golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod 
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod 
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod 
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/net v0.0.0-20220412020605-290c469a71a5/go.mod 
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod 
h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod 
h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod 
h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
 golang.org/x/net v0.0.0-20220920203100-d0c6ba3f52d9 
h1:asZqf0wXastQr+DudYagQS8uBO8bHKeYD1vbAvGmFL8=
 golang.org/x/net v0.0.0-20220920203100-d0c6ba3f52d9/go.mod 
h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod 
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -710,7 +713,6 @@ golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod 
h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -724,6 +726,7 @@ golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod 
h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 
h1:h+EGohizhe9XlX18rfpa8k8RAc5XyaeamM+0VHRd4lc=
 golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod 
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@@ -1056,5 +1059,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.3 
h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kF
 sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod 
h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E=
 sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
 sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
-skywalking.apache.org/repo/goapi v0.0.0-20220824100816-9c0fee7e3581 
h1:Pst4hR7kYpUXauPPZFCcRtt84E/ccsflDoKLODjA2Uo=
-skywalking.apache.org/repo/goapi v0.0.0-20220824100816-9c0fee7e3581/go.mod 
h1:ARcOiM3INd/UOU3Naeqkmo8WFS96jiHmSUawLoAXZjU=
+skywalking.apache.org/repo/goapi v0.0.0-20221115073815-1d2a7c96c0b1 
h1:AeRofErSIajLM+TNbMi0XXV9DZ4WAz5I5lM5gNTDB74=
+skywalking.apache.org/repo/goapi v0.0.0-20221115073815-1d2a7c96c0b1/go.mod 
h1:lxmYWY1uAP5SLVKNymAyDzn7KG6dhPWN+pYHmyt+0vo=
diff --git a/plugins/forwarder/grpc/nativetracing/forwarder.go 
b/plugins/forwarder/grpc/nativetracing/forwarder.go
index bd87c43..7ed84a8 100644
--- a/plugins/forwarder/grpc/nativetracing/forwarder.go
+++ b/plugins/forwarder/grpc/nativetracing/forwarder.go
@@ -23,6 +23,8 @@ import (
        "io"
        "reflect"
 
+       "github.com/hashicorp/go-multierror"
+
        "google.golang.org/grpc"
 
        "github.com/apache/skywalking-satellite/internal/pkg/config"
@@ -30,6 +32,7 @@ import (
        "github.com/apache/skywalking-satellite/internal/satellite/event"
        server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
 
+       v3 "skywalking.apache.org/repo/goapi/collect/common/v3"
        agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
        v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
 )
@@ -42,7 +45,13 @@ const (
 type Forwarder struct {
        config.CommonFields
 
-       tracingClient agent.TraceSegmentReportServiceClient
+       tracingClient       agent.TraceSegmentReportServiceClient
+       attachedEventClient agent.SpanAttachedEventReportServiceClient
+}
+
+type streaming interface {
+       SendMsg(m interface{}) error
+       CloseAndRecv() (*v3.Commands, error)
 }
 
 func (f *Forwarder) Name() string {
@@ -68,39 +77,68 @@ func (f *Forwarder) Prepare(connection interface{}) error {
                        f.Name(), reflect.TypeOf(connection).String())
        }
        f.tracingClient = agent.NewTraceSegmentReportServiceClient(client)
+       f.attachedEventClient = 
agent.NewSpanAttachedEventReportServiceClient(client)
        return nil
 }
 
 func (f *Forwarder) Forward(batch event.BatchEvents) error {
-       stream, err := f.tracingClient.Collect(context.Background())
-       if err != nil {
-               log.Logger.Errorf("open grpc stream error %v", err)
-               return err
-       }
+       var tracingStream agent.TraceSegmentReportService_CollectClient
+       var spanStream agent.SpanAttachedEventReportService_CollectClient
+
+       defer func() {
+               if err := closeStream(tracingStream, spanStream); err != nil {
+                       log.Logger.Errorf("%s close stream error: %v", 
f.Name(), err)
+               }
+       }()
+       var err error
+       var stream streaming
+       var streamData *server_grpc.OriginalData
        for _, e := range batch {
-               data, ok := e.GetData().(*v1.SniffData_Segment)
-               if !ok {
+               switch data := e.GetData().(type) {
+               case *v1.SniffData_Segment:
+                       if tracingStream == nil {
+                               tracingStream, err = 
f.tracingClient.Collect(context.Background())
+                               if err != nil {
+                                       log.Logger.Errorf("open grpc stream 
error %v", err)
+                                       return err
+                               }
+                       }
+                       stream = tracingStream
+                       streamData = server_grpc.NewOriginalData(data.Segment)
+               case *v1.SniffData_SpanAttachedEvent:
+                       if spanStream == nil {
+                               spanStream, err = 
f.attachedEventClient.Collect(context.Background())
+                               if err != nil {
+                                       log.Logger.Errorf("open grpc stream 
error %v", err)
+                                       return err
+                               }
+                       }
+                       stream = spanStream
+                       streamData = 
server_grpc.NewOriginalData(data.SpanAttachedEvent)
+               default:
                        continue
                }
-               err := stream.SendMsg(server_grpc.NewOriginalData(data.Segment))
+
+               err = stream.SendMsg(streamData)
                if err != nil {
                        log.Logger.Errorf("%s send log data error: %v", 
f.Name(), err)
-                       err = closeStream(stream)
-                       if err != nil {
-                               log.Logger.Errorf("%s close stream error: %v", 
f.Name(), err)
-                       }
                        return err
                }
        }
-       return closeStream(stream)
+       return closeStream(tracingStream, spanStream)
 }
 
-func closeStream(stream agent.TraceSegmentReportService_CollectClient) error {
-       _, err := stream.CloseAndRecv()
-       if err != nil && err != io.EOF {
-               return err
+func closeStream(streams ...streaming) error {
+       var err error
+       for _, s := range streams {
+               if s == nil {
+                       continue
+               }
+               if _, e := s.CloseAndRecv(); e != nil && e != io.EOF {
+                       err = multierror.Append(err, e)
+               }
        }
-       return nil
+       return err
 }
 
 func (f *Forwarder) ForwardType() v1.SniffType {
diff --git a/plugins/receiver/grpc/nativetracing/attached_event_service.go 
b/plugins/receiver/grpc/nativetracing/attached_event_service.go
new file mode 100644
index 0000000..ea0bef8
--- /dev/null
+++ b/plugins/receiver/grpc/nativetracing/attached_event_service.go
@@ -0,0 +1,58 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativetracing
+
+import (
+       "io"
+       "time"
+
+       "github.com/apache/skywalking-satellite/plugins/server/grpc"
+
+       common "skywalking.apache.org/repo/goapi/collect/common/v3"
+       agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+       v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+type SpanAttachedEventReportService struct {
+       receiveChannel chan *v1.SniffData
+       agent.UnimplementedSpanAttachedEventReportServiceServer
+}
+
+func (s *SpanAttachedEventReportService) Collect(stream 
agent.SpanAttachedEventReportService_CollectServer) error {
+       for {
+               recData := grpc.NewOriginalData(nil)
+               err := stream.RecvMsg(recData)
+               if err == io.EOF {
+                       return stream.SendAndClose(&common.Commands{})
+               }
+               if err != nil {
+                       return err
+               }
+               e := &v1.SniffData{
+                       Name:      eventName,
+                       Timestamp: time.Now().UnixNano() / 1e6,
+                       Meta:      nil,
+                       Type:      v1.SniffType_TracingType,
+                       Remote:    true,
+                       Data: &v1.SniffData_SpanAttachedEvent{
+                               SpanAttachedEvent: recData.Content,
+                       },
+               }
+               s.receiveChannel <- e
+       }
+}
diff --git a/plugins/receiver/grpc/nativetracing/receiver.go 
b/plugins/receiver/grpc/nativetracing/receiver.go
index 2231e2e..44ba20b 100644
--- a/plugins/receiver/grpc/nativetracing/receiver.go
+++ b/plugins/receiver/grpc/nativetracing/receiver.go
@@ -37,7 +37,8 @@ const (
 type Receiver struct {
        config.CommonFields
        grpc.CommonGRPCReceiverFields
-       service *TraceSegmentReportService
+       traceService           *TraceSegmentReportService
+       spanAttachEventService *SpanAttachedEventReportService
 }
 
 func (r *Receiver) Name() string {
@@ -49,7 +50,7 @@ func (r *Receiver) ShowName() string {
 }
 
 func (r *Receiver) Description() string {
-       return "This is a receiver for SkyWalking native tracing format, " +
+       return "This is a receiver for SkyWalking native tracing and span 
attached event format, " +
                "which is defined at 
https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/Tracing.proto.";
 }
 
@@ -59,9 +60,11 @@ func (r *Receiver) DefaultConfig() string {
 
 func (r *Receiver) RegisterHandler(server interface{}) {
        r.CommonGRPCReceiverFields = *grpc.InitCommonGRPCReceiverFields(server)
-       r.service = &TraceSegmentReportService{receiveChannel: r.OutputChannel}
-       v3.RegisterTraceSegmentReportServiceServer(r.Server, r.service)
-       v3_compat.RegisterTraceSegmentReportServiceServer(r.Server, 
&TraceSegmentReportServiceCompat{reportService: r.service})
+       r.traceService = &TraceSegmentReportService{receiveChannel: 
r.OutputChannel}
+       r.spanAttachEventService = 
&SpanAttachedEventReportService{receiveChannel: r.OutputChannel}
+       v3.RegisterTraceSegmentReportServiceServer(r.Server, r.traceService)
+       v3.RegisterSpanAttachedEventReportServiceServer(r.Server, 
r.spanAttachEventService)
+       v3_compat.RegisterTraceSegmentReportServiceServer(r.Server, 
&TraceSegmentReportServiceCompat{reportService: r.traceService})
 }
 
 func (r *Receiver) RegisterSyncInvoker(_ module.SyncInvoker) {

Reply via email to