This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push: new aed63e2 add meter pipe (#56) aed63e2 is described below commit aed63e237568f861214913271022ab1230e76b1d Author: Evan <31562192+evan...@users.noreply.github.com> AuthorDate: Thu Jul 1 23:30:43 2021 +0800 add meter pipe (#56) --- configs/satellite_config.yaml | 36 +++++++++-- ....md => forwarder_nativemeter-grpc-forwarder.md} | 2 +- docs/en/setup/plugins/plugin-list.md | 3 +- .../plugins/receiver_grpc-nativemeter-receiver.md | 5 ++ docs/menu.yml | 6 +- plugins/forwarder/forwarder_repository.go | 2 +- .../nativecds/{sync_forwarder.go => forwarder.go} | 0 .../nativelog/{sync_forwarder.go => forwarder.go} | 0 .../{sync_forwarder.go => forwarder.go} | 0 .../sync_forwarder.go => nativemeter/forwarder.go} | 36 +++++++---- .../{sync_forwarder.go => forwarder.go} | 0 .../{sync_forwarder.go => forwarder.go} | 0 .../nativelog/{sync_forwarder.go => forwarder.go} | 0 plugins/receiver/grpc/nativemeter/meter_service.go | 68 ++++++++++++++++++++ plugins/receiver/grpc/nativemeter/receiver.go | 61 ++++++++++++++++++ plugins/receiver/grpc/nativemeter/receiver_test.go | 74 ++++++++++++++++++++++ plugins/receiver/receiver_repository.go | 2 + 17 files changed, 271 insertions(+), 24 deletions(-) diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml index 20cacc5..b7b64f6 100644 --- a/configs/satellite_config.yaml +++ b/configs/satellite_config.yaml @@ -195,11 +195,11 @@ pipes: fallbacker: plugin_name: none-fallbacker # The time interval between two flush operations. And the time unit is millisecond. - flush_time: ${SATELLITE_TRACINGPIPE_SENDER_FLUSH_TIME:1000} + flush_time: ${SATELLITE_EVENTPIPE_SENDER_FLUSH_TIME:1000} # The maximum buffer elements. - max_buffer_size: ${SATELLITE_TRACINGPIPE_SENDER_MAX_BUFFER_SIZE:200} + max_buffer_size: ${SATELLITE_EVENTPIPE_SENDER_MAX_BUFFER_SIZE:200} # The minimum flush elements. - min_flush_events: ${SATELLITE_TRACINGPIPE_SENDER_MIN_FLUSH_EVENTS:1} + min_flush_events: ${SATELLITE_EVENTPIPE_SENDER_MIN_FLUSH_EVENTS:1} client_name: grpc-client forwarders: - plugin_name: nativeevent-grpc-forwarder @@ -217,11 +217,33 @@ pipes: fallbacker: plugin_name: none-fallbacker # The time interval between two flush operations. And the time unit is millisecond. - flush_time: ${SATELLITE_TRACINGPIPE_SENDER_FLUSH_TIME:1000} + flush_time: ${SATELLITE_JVMPIPE_SENDER_FLUSH_TIME:1000} # The maximum buffer elements. - max_buffer_size: ${SATELLITE_TRACINGPIPE_SENDER_MAX_BUFFER_SIZE:200} + max_buffer_size: ${SATELLITE_JVMPIPE_SENDER_MAX_BUFFER_SIZE:200} # The minimum flush elements. - min_flush_events: ${SATELLITE_TRACINGPIPE_SENDER_MIN_FLUSH_EVENTS:1} + min_flush_events: ${SATELLITE_JVMPIPE_SENDER_MIN_FLUSH_EVENTS:1} + client_name: grpc-client + forwarders: + - plugin_name: nativejvm-grpc-forwarder + - common_config: + pipe_name: meterpipe + gatherer: + server_name: "grpc-server" + receiver: + plugin_name: "grpc-nativemeter-receiver" + queue: + plugin_name: "memory-queue" + processor: + filters: + sender: + fallbacker: + plugin_name: none-fallbacker + # The time interval between two flush operations. And the time unit is millisecond. + flush_time: ${SATELLITE_METERPIPE_SENDER_FLUSH_TIME:1000} + # The maximum buffer elements. + max_buffer_size: ${SATELLITE_METERPIPE_SENDER_MAX_BUFFER_SIZE:200} + # The minimum flush elements. + min_flush_events: ${SATELLITE_METERPIPE_SENDER_MIN_FLUSH_EVENTS:1} client_name: grpc-client forwarders: - - plugin_name: nativejvm-grpc-forwarder \ No newline at end of file + - plugin_name: nativemeter-grpc-forwarder diff --git a/docs/en/setup/plugins/forwarder_meter-grpc-forwarder.md b/docs/en/setup/plugins/forwarder_nativemeter-grpc-forwarder.md similarity index 76% rename from docs/en/setup/plugins/forwarder_meter-grpc-forwarder.md rename to docs/en/setup/plugins/forwarder_nativemeter-grpc-forwarder.md index 83005b0..3a8d083 100755 --- a/docs/en/setup/plugins/forwarder_meter-grpc-forwarder.md +++ b/docs/en/setup/plugins/forwarder_nativemeter-grpc-forwarder.md @@ -1,4 +1,4 @@ -# Forwarder/meter-grpc-forwarder +# Forwarder/nativemeter-grpc-forwarder ## Description This is a synchronization meter grpc forwarder with the SkyWalking meter protocol. ## DefaultConfig diff --git a/docs/en/setup/plugins/plugin-list.md b/docs/en/setup/plugins/plugin-list.md index 6dca7e3..a8b97ec 100755 --- a/docs/en/setup/plugins/plugin-list.md +++ b/docs/en/setup/plugins/plugin-list.md @@ -9,13 +9,13 @@ - [prometheus-metrics-fetcher](./fetcher_prometheus-metrics-fetcher.md) - Filter - Forwarder - - [meter-grpc-forwarder](./forwarder_meter-grpc-forwarder.md) - [nativecds-grpc-forwarder](./forwarder_nativecds-grpc-forwarder.md) - [nativeevent-grpc-forwarder](./forwarder_nativeevent-grpc-forwarder.md) - [nativejvm-grpc-forwarder](./forwarder_nativejvm-grpc-forwarder.md) - [nativelog-grpc-forwarder](./forwarder_nativelog-grpc-forwarder.md) - [nativelog-kafka-forwarder](./forwarder_nativelog-kafka-forwarder.md) - [nativemanagement-grpc-forwarder](./forwarder_nativemanagement-grpc-forwarder.md) + - [nativemeter-grpc-forwarder](./forwarder_nativemeter-grpc-forwarder.md) - [nativeprofile-grpc-forwarder](./forwarder_nativeprofile-grpc-forwarder.md) - [nativetracing-grpc-forwarder](./forwarder_nativetracing-grpc-forwarder.md) - Parser @@ -29,6 +29,7 @@ - [grpc-nativejvm-receiver](./receiver_grpc-nativejvm-receiver.md) - [grpc-nativelog-receiver](./receiver_grpc-nativelog-receiver.md) - [grpc-nativemanagement-receiver](./receiver_grpc-nativemanagement-receiver.md) + - [grpc-nativemeter-receiver](./receiver_grpc-nativemeter-receiver.md) - [grpc-nativeprofile-receiver](./receiver_grpc-nativeprofile-receiver.md) - [grpc-nativetracing-receiver](./receiver_grpc-nativetracing-receiver.md) - [http-nativelog-receiver](./receiver_http-nativelog-receiver.md) diff --git a/docs/en/setup/plugins/receiver_grpc-nativemeter-receiver.md b/docs/en/setup/plugins/receiver_grpc-nativemeter-receiver.md new file mode 100755 index 0000000..8988fcd --- /dev/null +++ b/docs/en/setup/plugins/receiver_grpc-nativemeter-receiver.md @@ -0,0 +1,5 @@ +# Receiver/grpc-nativemeter-receiver +## Description +This is a receiver for SkyWalking native meter format, which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/event/Event.proto. +## DefaultConfig +```yaml``` diff --git a/docs/menu.yml b/docs/menu.yml index 0f20958..803c3cc 100644 --- a/docs/menu.yml +++ b/docs/menu.yml @@ -69,8 +69,6 @@ catalog: path: /en/setup/plugins/fetcher_prometheus-metrics-fetcher - name: forwarder catalog: - - name: meter-grpc-forwarder - path: /en/setup/plugins/forwarder_meter-grpc-forwarder - name: nativecds-grpc-forwarder path: /en/setup/plugins/forwarder_nativecds-grpc-forwarder - name: nativeevent-grpc-forwarder @@ -83,6 +81,8 @@ catalog: path: /en/setup/plugins/forwarder_nativelog-kafka-forwarder - name: nativemanagement-grpc-forwarder path: /en/setup/plugins/forwarder_nativemanagement-grpc-forwarder + - name: nativemeter-grpc-forwarder + path: /en/setup/plugins/forwarder_nativemeter-grpc-forwarder - name: nativeprofile-grpc-forwarder path: /en/setup/plugins/forwarder_nativeprofile-grpc-forwarder - name: nativetracing-grpc-forwarder @@ -107,6 +107,8 @@ catalog: path: /en/setup/plugins/receiver_grpc-nativelog-receiver - name: grpc-nativemanagement-receiver path: /en/setup/plugins/receiver_grpc-nativemanagement-receiver + - name: grpc-nativemeter-receiver + path: /en/setup/plugins/receiver_grpc-nativemeter-receiver - name: grpc-nativeprofile-receiver path: /en/setup/plugins/receiver_grpc-nativeprofile-receiver - name: grpc-nativetracing-receiver diff --git a/plugins/forwarder/forwarder_repository.go b/plugins/forwarder/forwarder_repository.go index f9419f9..856cdb3 100644 --- a/plugins/forwarder/forwarder_repository.go +++ b/plugins/forwarder/forwarder_repository.go @@ -20,12 +20,12 @@ package forwarder import ( "reflect" - grpc_meter "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/meter" grpc_nativecds "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativecds" grpc_nativeevent "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeevent" grpc_nativejvm "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativejvm" grpc_nativelog "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativelog" grpc_nativemanagement "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativemanagement" + grpc_meter "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativemeter" grpc_nativeprofile "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeprofile" grpc_nativetracing "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativetracing" kafka_nativelog "github.com/apache/skywalking-satellite/plugins/forwarder/kafka/nativelog" diff --git a/plugins/forwarder/grpc/nativecds/sync_forwarder.go b/plugins/forwarder/grpc/nativecds/forwarder.go similarity index 100% rename from plugins/forwarder/grpc/nativecds/sync_forwarder.go rename to plugins/forwarder/grpc/nativecds/forwarder.go diff --git a/plugins/forwarder/grpc/nativelog/sync_forwarder.go b/plugins/forwarder/grpc/nativelog/forwarder.go similarity index 100% rename from plugins/forwarder/grpc/nativelog/sync_forwarder.go rename to plugins/forwarder/grpc/nativelog/forwarder.go diff --git a/plugins/forwarder/grpc/nativemanagement/sync_forwarder.go b/plugins/forwarder/grpc/nativemanagement/forwarder.go similarity index 100% rename from plugins/forwarder/grpc/nativemanagement/sync_forwarder.go rename to plugins/forwarder/grpc/nativemanagement/forwarder.go diff --git a/plugins/forwarder/grpc/meter/sync_forwarder.go b/plugins/forwarder/grpc/nativemeter/forwarder.go similarity index 79% rename from plugins/forwarder/grpc/meter/sync_forwarder.go rename to plugins/forwarder/grpc/nativemeter/forwarder.go index 5df933d..86a8b42 100644 --- a/plugins/forwarder/grpc/meter/sync_forwarder.go +++ b/plugins/forwarder/grpc/nativemeter/forwarder.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package meter +package nativemeter import ( "context" @@ -33,7 +33,7 @@ import ( "github.com/apache/skywalking-satellite/internal/satellite/event" ) -const Name = "meter-grpc-forwarder" +const Name = "nativemeter-grpc-forwarder" type Forwarder struct { config.CommonFields @@ -63,27 +63,39 @@ func (f *Forwarder) Prepare(connection interface{}) error { } func (f *Forwarder) Forward(batch event.BatchEvents) error { - stream, err := f.meterClient.Collect(context.Background()) - if err != nil { - log.Logger.Errorf("open grpc stream error %v", err) - return err - } + streamMap := make(map[string]v3.MeterReportService_CollectClient) + defer func() { + for _, stream := range streamMap { + err := closeStream(stream) + if err != nil { + log.Logger.Warnf("%s close stream error: %v", f.Name(), err) + } + } + }() for _, e := range batch { data, ok := e.GetData().(*v1.SniffData_Meter) if !ok { continue } + streamName := fmt.Sprintf("%s_%s", data.Meter.Service, data.Meter.ServiceInstance) + stream := streamMap[streamName] + if stream == nil { + curStream, err := f.meterClient.Collect(context.Background()) + if err != nil { + log.Logger.Errorf("open grpc stream error %v", err) + return err + } + streamMap[streamName] = curStream + stream = curStream + } + err := stream.Send(data.Meter) if err != nil { log.Logger.Errorf("%s send meter data error: %v", f.Name(), err) - err = closeStream(stream) - if err != nil { - log.Logger.Errorf("%s close stream error: %v", f.Name(), err) - } return err } } - return closeStream(stream) + return nil } func closeStream(stream v3.MeterReportService_CollectClient) error { diff --git a/plugins/forwarder/grpc/nativeprofile/sync_forwarder.go b/plugins/forwarder/grpc/nativeprofile/forwarder.go similarity index 100% rename from plugins/forwarder/grpc/nativeprofile/sync_forwarder.go rename to plugins/forwarder/grpc/nativeprofile/forwarder.go diff --git a/plugins/forwarder/grpc/nativetracing/sync_forwarder.go b/plugins/forwarder/grpc/nativetracing/forwarder.go similarity index 100% rename from plugins/forwarder/grpc/nativetracing/sync_forwarder.go rename to plugins/forwarder/grpc/nativetracing/forwarder.go diff --git a/plugins/forwarder/kafka/nativelog/sync_forwarder.go b/plugins/forwarder/kafka/nativelog/forwarder.go similarity index 100% rename from plugins/forwarder/kafka/nativelog/sync_forwarder.go rename to plugins/forwarder/kafka/nativelog/forwarder.go diff --git a/plugins/receiver/grpc/nativemeter/meter_service.go b/plugins/receiver/grpc/nativemeter/meter_service.go new file mode 100644 index 0000000..0a101cf --- /dev/null +++ b/plugins/receiver/grpc/nativemeter/meter_service.go @@ -0,0 +1,68 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package nativeevent + +import ( + "io" + "time" + + common "skywalking.apache.org/repo/goapi/collect/common/v3" + meter "skywalking.apache.org/repo/goapi/collect/language/agent/v3" + v1 "skywalking.apache.org/repo/goapi/satellite/data/v1" +) + +const eventName = "grpc-nativemeter-event" + +type MeterService struct { + receiveChannel chan *v1.SniffData + meter.UnimplementedMeterReportServiceServer +} + +func (m *MeterService) Collect(stream meter.MeterReportService_CollectServer) error { + var service, instance string + for { + item, err := stream.Recv() + if err == io.EOF { + return stream.SendAndClose(&common.Commands{}) + } + if err != nil { + return err + } + // only first item has service and service instance property + // need correlate information to each item + if item.Service != "" { + service = item.Service + } + if item.ServiceInstance != "" { + instance = item.ServiceInstance + } + item.Service = service + item.ServiceInstance = instance + d := &v1.SniffData{ + Name: eventName, + Timestamp: time.Now().UnixNano() / 1e6, + Meta: nil, + Type: v1.SniffType_MeterType, + Remote: true, + Data: &v1.SniffData_Meter{ + Meter: item, + }, + } + m.receiveChannel <- d + } +} diff --git a/plugins/receiver/grpc/nativemeter/receiver.go b/plugins/receiver/grpc/nativemeter/receiver.go new file mode 100644 index 0000000..346a230 --- /dev/null +++ b/plugins/receiver/grpc/nativemeter/receiver.go @@ -0,0 +1,61 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package nativeevent + +import ( + "github.com/apache/skywalking-satellite/internal/pkg/config" + module "github.com/apache/skywalking-satellite/internal/satellite/module/api" + "github.com/apache/skywalking-satellite/plugins/receiver/grpc" + + meter "skywalking.apache.org/repo/goapi/collect/language/agent/v3" + v1 "skywalking.apache.org/repo/goapi/satellite/data/v1" +) + +const Name = "grpc-nativemeter-receiver" + +type Receiver struct { + config.CommonFields + grpc.CommonGRPCReceiverFields + service *MeterService +} + +func (r *Receiver) Name() string { + return Name +} + +func (r *Receiver) Description() string { + return "This is a receiver for SkyWalking native meter format, " + + "which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/event/Event.proto." +} + +func (r *Receiver) DefaultConfig() string { + return "" +} + +func (r *Receiver) RegisterHandler(server interface{}) { + r.CommonGRPCReceiverFields = *grpc.InitCommonGRPCReceiverFields(server) + r.service = &MeterService{receiveChannel: r.OutputChannel} + meter.RegisterMeterReportServiceServer(r.Server, r.service) +} + +func (r *Receiver) RegisterSyncInvoker(_ module.SyncInvoker) { +} + +func (r *Receiver) Channel() <-chan *v1.SniffData { + return r.OutputChannel +} diff --git a/plugins/receiver/grpc/nativemeter/receiver_test.go b/plugins/receiver/grpc/nativemeter/receiver_test.go new file mode 100644 index 0000000..91c6a2d --- /dev/null +++ b/plugins/receiver/grpc/nativemeter/receiver_test.go @@ -0,0 +1,74 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package nativeevent + +import ( + "context" + "strconv" + "testing" + "time" + + "google.golang.org/grpc" + + meter "skywalking.apache.org/repo/goapi/collect/language/agent/v3" + v1 "skywalking.apache.org/repo/goapi/satellite/data/v1" + + _ "github.com/apache/skywalking-satellite/internal/satellite/test" + receiver_grpc "github.com/apache/skywalking-satellite/plugins/receiver/grpc" +) + +func TestReceiver_RegisterHandler(t *testing.T) { + receiver_grpc.TestReceiver(new(Receiver), func(t *testing.T, sequence int, conn *grpc.ClientConn, ctx context.Context) string { + client := meter.NewMeterReportServiceClient(conn) + data := initData(sequence) + collect, err := client.Collect(ctx) + if err != nil { + t.Fatalf("cannot open the stream send mode: %v", err) + } + if err := collect.Send(data); err != nil { + t.Fatalf("cannot send the data to the server: %v", err) + } + if err := collect.CloseSend(); err != nil { + t.Fatalf("cannot close the stream mode: %v", err) + } + return data.String() + }, func(data *v1.SniffData) string { + return data.GetMeter().String() + }, t) +} + +func initData(sequence int) *meter.MeterData { + seq := strconv.Itoa(sequence) + return &meter.MeterData{ + Service: "demo-service" + seq, + ServiceInstance: "demo-instance" + seq, + Timestamp: time.Now().Unix() / 1e6, + Metric: &meter.MeterData_SingleValue{ + SingleValue: &meter.MeterSingleValue{ + Name: "name" + seq, + Value: float64(sequence), + Labels: []*meter.Label{ + { + Name: "label-name" + seq, + Value: "label-value" + seq, + }, + }, + }, + }, + } +} diff --git a/plugins/receiver/receiver_repository.go b/plugins/receiver/receiver_repository.go index e294fd3..f485530 100644 --- a/plugins/receiver/receiver_repository.go +++ b/plugins/receiver/receiver_repository.go @@ -27,6 +27,7 @@ import ( grpcnativejvm "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativejvm" grpcnavtivelog "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativelog" grpcnativemanagement "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativemanagement" + grpcnativemeter "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativemeter" grpcnativeprofile "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativeprofile" grpcnativetracing "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativetracing" httpnavtivelog "github.com/apache/skywalking-satellite/plugins/receiver/http/nativcelog" @@ -45,6 +46,7 @@ func RegisterReceiverPlugins() { new(httpnavtivelog.Receiver), new(grpcnativejvm.Receiver), new(grpcnativeevent.Receiver), + new(grpcnativemeter.Receiver), } for _, receiver := range receivers { plugin.RegisterPlugin(receiver)