This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push:
new 4278acb Support gRPC client and transmit native log (#44)
4278acb is described below
commit 4278acb89565466d5812ab00b231493be2c35503
Author: mrproliu <[email protected]>
AuthorDate: Thu Jun 3 21:07:59 2021 +0800
Support gRPC client and transmit native log (#44)
Co-authored-by: dalekliuhan <[email protected]>
---
CHANGES.md | 4 +-
configs/satellite_config.yaml | 28 ++--
docs/en/setup/plugins/client_grpc-client.md | 29 +++++
.../plugins/forwarder_nativelog-grpc-forwarder.md | 5 +
docs/en/setup/plugins/plugin-list.md | 2 +
internal/satellite/config/loader_test.go | 141 ++++++++++----------
plugins/client/client_repository.go | 3 +
plugins/client/grpc/client.go | 142 +++++++++++++++++++++
plugins/client/grpc/client_config.go | 123 ++++++++++++++++++
plugins/client/grpc/client_sniffer.go | 73 +++++++++++
plugins/forwarder/forwarder_repository.go | 7 +-
plugins/forwarder/grpc/nativelog/sync_forwarder.go | 101 +++++++++++++++
12 files changed, 576 insertions(+), 82 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 96ea4f7..e599503 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -5,7 +5,9 @@ Release Notes.
0.2.0
------------------
#### Features
-Update protoc-gen-go version to 1.26.0.
+* Update protoc-gen-go version to 1.26.0.
+* Add grpc client plugin.
+* Add nativelog-grpc-forwarder plugin.
#### Bug Fixes
Fix the data race in mmap queue.
diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index b79e614..8eea955 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -36,19 +36,23 @@ telemetry:
# The sharing plugins referenced by the specific plugins in the different
pipes.
sharing:
clients:
- - plugin_name: "kafka-client"
- # The Kafka broker addresses (default localhost:9092). Multiple values
are separated by commas.
- brokers: ${SATELLITE_KAFKA_CLIENT_BROKERS:127.0.0.1:9092}
- # The Kakfa version should follow this pattern, which is
major_minor_veryMinor_patch.
- version: ${SATELLITE_KAFKA_VERSION:"2.1.1"}
+ - plugin_name: "grpc-client"
+ # The gRPC server address (default localhost:11800).
+ server_addr: ${SATELLITE_GRPC_CLIENT:127.0.0.1:11800}
# The TLS switch
- enable_TLS: ${SATELLITE_KAFKA_ENABLE_TLS:false}
+ enable_TLS: ${SATELLITE_GRPC_ENABLE_TLS:false}
# The file path of client.pem. The config only works when opening the
TLS switch.
- client_pem_path: ${SATELLITE_KAFKA_CLIENT_PEM_PATH:"client.pem"}
+ client_pem_path: ${SATELLITE_GRPC_CLIENT_PEM_PATH:"client.pem"}
# The file path of client.key. The config only works when opening the
TLS switch.
- client_key_path: ${SATELLITE_KAFKA_CLIENT_KEY_PATH:"client.key"}
+ client_key_path: ${SATELLITE_GRPC_CLIENT_KEY_PATH:"client.key"}
+ # InsecureSkipVerify controls whether a client verifies the server's
certificate chain and host name.
+ insecure_skip_verify: ${SATELLITE_GRPC_INSECURE_SKIP_VERIFY:false}
# The file path oca.pem. The config only works when opening the TLS
switch.
- ca_pem_path: ${SATELLITE_KAFKA_CA_PEM_PATH:"ca.pem"}
+ ca_pem_path: ${SATELLITE_grpc_CA_PEM_PATH:"ca.pem"}
+ # How frequently to check the connection
+ check_period: ${SATELLITE_GRPC_CHECK_PERIOD:5}
+ # The auth value when send request
+ authentication: ${SATELLITE_GRPC_AUTHENTICATION:""}
servers:
- plugin_name: "grpc-server"
# The address of grpc server.
@@ -86,8 +90,6 @@ pipes:
max_buffer_size: ${SATELLITE_LOGPIPE_SENDER_MAX_BUFFER_SIZE:200}
# The minimum flush elements.
min_flush_events: ${SATELLITE_LOGPIPE_SENDER_MIN_FLUSH_EVENTS:100}
- client_name: kafka-client
+ client_name: grpc-client
forwarders:
- - plugin_name: nativelog-kafka-forwarder
- # The remote kafka topic.
- topic: ${SATELLITE_NATIVELOG-TOPIC:log-topic}
+ - plugin_name: nativelog-grpc-forwarder
diff --git a/docs/en/setup/plugins/client_grpc-client.md
b/docs/en/setup/plugins/client_grpc-client.md
new file mode 100755
index 0000000..27ada86
--- /dev/null
+++ b/docs/en/setup/plugins/client_grpc-client.md
@@ -0,0 +1,29 @@
+# Client/grpc-client
+## Description
+The gRPC client is a sharing plugin to keep connection with the gRPC server
and delivery the data to it.
+## DefaultConfig
+```yaml
+# The gRPC server address (default localhost:11800).
+server_addr: localhost:11800
+
+# The TLS switch (default false).
+enable_TLS: false
+
+# The file path of client.pem. The config only works when opening the TLS
switch.
+client_pem_path: ""
+
+# The file path of client.key. The config only works when opening the TLS
switch.
+client_key_path: ""
+
+# The file path oca.pem. The config only works when opening the TLS switch.
+ca_pem_path: ""
+
+# InsecureSkipVerify controls whether a client verifies the server's
certificate chain and host name.
+insecure_skip_verify: true
+
+# The auth value when send request
+authentication: ""
+
+# How frequently to check the connection
+check_period: 5
+```
diff --git a/docs/en/setup/plugins/forwarder_nativelog-grpc-forwarder.md
b/docs/en/setup/plugins/forwarder_nativelog-grpc-forwarder.md
new file mode 100755
index 0000000..14350ed
--- /dev/null
+++ b/docs/en/setup/plugins/forwarder_nativelog-grpc-forwarder.md
@@ -0,0 +1,5 @@
+# Forwarder/nativelog-grpc-forwarder
+## Description
+This is a synchronization grpc forwarder with the SkyWalking native log
protocol.
+## DefaultConfig
+```yaml```
diff --git a/docs/en/setup/plugins/plugin-list.md
b/docs/en/setup/plugins/plugin-list.md
index e200cda..6760a9e 100755
--- a/docs/en/setup/plugins/plugin-list.md
+++ b/docs/en/setup/plugins/plugin-list.md
@@ -1,5 +1,6 @@
# Plugin List
- Client
+ - [grpc-client](./client_grpc-client.md)
- [kafka-client](./client_kafka-client.md)
- Fallbacker
- [none-fallbacker](./fallbacker_none-fallbacker.md)
@@ -7,6 +8,7 @@
- Fetcher
- Filter
- Forwarder
+ - [nativelog-grpc-forwarder](./forwarder_nativelog-grpc-forwarder.md)
- [nativelog-kafka-forwarder](./forwarder_nativelog-kafka-forwarder.md)
- Parser
- Queue
diff --git a/internal/satellite/config/loader_test.go
b/internal/satellite/config/loader_test.go
index 9712998..002a606 100644
--- a/internal/satellite/config/loader_test.go
+++ b/internal/satellite/config/loader_test.go
@@ -75,82 +75,91 @@ func params() *SatelliteConfig {
Service: "service1",
Instance: "instance1",
},
- Sharing: &SharingConfig{
- SharingCommonConfig: &config.CommonFields{
- PipeName: "sharing",
+ Sharing: sharing(),
+ Pipes: pipes(),
+ }
+}
+
+func sharing() *SharingConfig {
+ return &SharingConfig{
+ SharingCommonConfig: &config.CommonFields{
+ PipeName: "sharing",
+ },
+ Clients: []plugin.Config{
+ {
+ "plugin_name": "grpc-client",
+ "server_addr": "127.0.0.1:11800",
+ "commonfields_pipe_name": "sharing",
+ "ca_pem_path": "ca.pem",
+ "client_key_path": "client.key",
+ "client_pem_path": "client.pem",
+ "enable_TLS": false,
+ "insecure_skip_verify": false,
+ "check_period": 5,
+ "authentication": "",
},
- Clients: []plugin.Config{
- {
- "plugin_name":
"kafka-client",
- "brokers":
"127.0.0.1:9092",
- "version": "2.1.1",
- "commonfields_pipe_name": "sharing",
- "ca_pem_path": "ca.pem",
- "client_key_path": "client.key",
- "client_pem_path": "client.pem",
- "enable_TLS": false,
- },
+ },
+ Servers: []plugin.Config{
+ {
+ "plugin_name": "grpc-server",
+ "commonfields_pipe_name": "sharing",
+ "address": ":11800",
+ "tls_cert_file": "",
+ "tls_key_file": "",
+ },
+ {
+ "plugin_name": "prometheus-server",
+ "address": ":1234",
+ "commonfields_pipe_name": "sharing",
+ "endpoint": "/metrics",
+ },
+ },
+ }
+}
+
+func pipes() []*PipeConfig {
+ return []*PipeConfig{
+ {
+ PipeCommonConfig: &config.CommonFields{
+ PipeName: "logpipe",
},
- Servers: []plugin.Config{
- {
- "plugin_name": "grpc-server",
- "commonfields_pipe_name": "sharing",
- "address": ":11800",
- "tls_cert_file": "",
- "tls_key_file": "",
+
+ Gatherer: &gatherer.GathererConfig{
+ ServerName: "grpc-server",
+ CommonFields: &config.CommonFields{
+ PipeName: "logpipe",
},
- {
- "plugin_name":
"prometheus-server",
- "address": ":1234",
- "commonfields_pipe_name": "sharing",
- "endpoint": "/metrics",
+ ReceiverConfig: plugin.Config{
+ "plugin_name":
"grpc-nativelog-receiver",
+ "commonfields_pipe_name": "logpipe",
+ },
+ QueueConfig: plugin.Config{
+ "commonfields_pipe_name": "logpipe",
+ "plugin_name":
"memory-queue",
+ "event_buffer_size": 5000,
},
},
- },
- Pipes: []*PipeConfig{
- {
- PipeCommonConfig: &config.CommonFields{
+ Processor: &processor.ProcessorConfig{
+ CommonFields: &config.CommonFields{
PipeName: "logpipe",
},
-
- Gatherer: &gatherer.GathererConfig{
- ServerName: "grpc-server",
- CommonFields: &config.CommonFields{
- PipeName: "logpipe",
- },
- ReceiverConfig: plugin.Config{
- "plugin_name":
"grpc-nativelog-receiver",
- "commonfields_pipe_name":
"logpipe",
- },
- QueueConfig: plugin.Config{
- "commonfields_pipe_name":
"logpipe",
- "plugin_name":
"memory-queue",
- "event_buffer_size": 5000,
- },
+ },
+ Sender: &sender.SenderConfig{
+ CommonFields: &config.CommonFields{
+ PipeName: "logpipe",
},
- Processor: &processor.ProcessorConfig{
- CommonFields: &config.CommonFields{
- PipeName: "logpipe",
- },
+ FallbackerConfig: plugin.Config{
+ "commonfields_pipe_name": "logpipe",
+ "plugin_name":
"none-fallbacker",
},
- Sender: &sender.SenderConfig{
- CommonFields: &config.CommonFields{
- PipeName: "logpipe",
- },
- FallbackerConfig: plugin.Config{
+ FlushTime: 1000,
+ MaxBufferSize: 200,
+ MinFlushEvents: 100,
+ ClientName: "grpc-client",
+ ForwardersConfig: []plugin.Config{
+ {
+ "plugin_name":
"nativelog-grpc-forwarder",
"commonfields_pipe_name":
"logpipe",
- "plugin_name":
"none-fallbacker",
- },
- FlushTime: 1000,
- MaxBufferSize: 200,
- MinFlushEvents: 100,
- ClientName: "kafka-client",
- ForwardersConfig: []plugin.Config{
- {
- "plugin_name":
"nativelog-kafka-forwarder",
- "topic":
"log-topic",
-
"commonfields_pipe_name": "logpipe",
- },
},
},
},
diff --git a/plugins/client/client_repository.go
b/plugins/client/client_repository.go
index 1300b37..3b17417 100644
--- a/plugins/client/client_repository.go
+++ b/plugins/client/client_repository.go
@@ -20,6 +20,8 @@ package client
import (
"reflect"
+ "github.com/apache/skywalking-satellite/plugins/client/grpc"
+
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
"github.com/apache/skywalking-satellite/plugins/client/api"
"github.com/apache/skywalking-satellite/plugins/client/kafka"
@@ -31,6 +33,7 @@ func RegisterClientPlugins() {
clients := []api.Client{
// Please register the client plugins at here.
new(kafka.Client),
+ new(grpc.Client),
}
for _, client := range clients {
plugin.RegisterPlugin(client)
diff --git a/plugins/client/grpc/client.go b/plugins/client/grpc/client.go
new file mode 100644
index 0000000..b8598c9
--- /dev/null
+++ b/plugins/client/grpc/client.go
@@ -0,0 +1,142 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/sirupsen/logrus"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/grpclog"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/config"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/plugins/client/api"
+)
+
+const Name = "grpc-client"
+
+type Client struct {
+ config.CommonFields
+ // config
+ ServerAddr string `mapstructure:"server_addr"` // The
gRPC server address
+ EnableTLS bool `mapstructure:"enable_TLS"` //
Enable TLS connect to server
+ ClientPemPath string `mapstructure:"client_pem_path"` // The
file path of client.pem. The config only works when opening the TLS switch.
+ ClientKeyPath string `mapstructure:"client_key_path"` // The
file path of client.key. The config only works when opening the TLS switch.
+ CaPemPath string `mapstructure:"ca_pem_path"` // The
file path oca.pem. The config only works when opening the TLS switch.
+ InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"` //
Controls whether a client verifies the server's certificate chain and host name.
+ Authentication string `mapstructure:"authentication"` // The
auth value when send request
+ CheckPeriod int `mapstructure:"check_period"` // How
frequently to check the connection
+
+ // components
+ status api.ClientStatus
+ client *grpc.ClientConn
+ listeners []chan<- api.ClientStatus
+ ctx context.Context // Parent ctx
+ cancel context.CancelFunc // Parent ctx cancel function
+}
+
+func (c *Client) Name() string {
+ return Name
+}
+
+func (c *Client) Description() string {
+ return "The gRPC client is a sharing plugin to keep connection with the
gRPC server and delivery the data to it."
+}
+
+func (c *Client) DefaultConfig() string {
+ return `
+# The gRPC server address (default localhost:11800).
+server_addr: localhost:11800
+
+# The TLS switch (default false).
+enable_TLS: false
+
+# The file path of client.pem. The config only works when opening the TLS
switch.
+client_pem_path: ""
+
+# The file path of client.key. The config only works when opening the TLS
switch.
+client_key_path: ""
+
+# The file path oca.pem. The config only works when opening the TLS switch.
+ca_pem_path: ""
+
+# InsecureSkipVerify controls whether a client verifies the server's
certificate chain and host name.
+insecure_skip_verify: true
+
+# The auth value when send request
+authentication: ""
+
+# How frequently to check the connection
+check_period: 5
+`
+}
+
+func (c *Client) Prepare() error {
+ // config
+ cfg, err := c.loadConfig()
+ if err != nil {
+ return fmt.Errorf("cannot init the grpc client: %v", err)
+ }
+
+ // logger
+
grpclog.SetLoggerV2(&logrusGrpcLoggerV2{log.Logger.WithFields(logrus.Fields{
+ "client_name": Name,
+ })})
+
+ // connect to server
+ client, err := grpc.Dial(c.ServerAddr, *cfg...)
+ if err != nil {
+ return fmt.Errorf("cannot connect to grpc server: %v", err)
+ }
+
+ c.client = client
+ c.status = api.Connected
+ c.ctx, c.cancel = context.WithCancel(context.Background())
+ c.listeners = make([]chan<- api.ClientStatus, 0)
+ return nil
+}
+
+func (c *Client) Close() error {
+ c.cancel()
+ defer log.Logger.Info("grpc client is closed")
+ return c.client.Close()
+}
+
+func (c *Client) GetConnectedClient() interface{} {
+ return c.client
+}
+
+func (c *Client) RegisterListener(listener chan<- api.ClientStatus) {
+ c.listeners = append(c.listeners, listener)
+}
+
+func (c *Client) Start() error {
+ go c.snifferChannelStatus()
+ return nil
+}
+
+// grpc log adaptor
+type logrusGrpcLoggerV2 struct {
+ *logrus.Entry
+}
+
+func (l *logrusGrpcLoggerV2) V(level int) bool {
+ return l.Logger.IsLevelEnabled(logrus.Level(level))
+}
diff --git a/plugins/client/grpc/client_config.go
b/plugins/client/grpc/client_config.go
new file mode 100644
index 0000000..5f7d49b
--- /dev/null
+++ b/plugins/client/grpc/client_config.go
@@ -0,0 +1,123 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
+ "io/ioutil"
+ "os"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/metadata"
+)
+
+// loadConfig use the client params to build the grpc client config.
+func (c *Client) loadConfig() (*[]grpc.DialOption, error) {
+ options := make([]grpc.DialOption, 0)
+
+ if c.EnableTLS {
+ configTLS, err := c.configTLS()
+ if err != nil {
+ return nil, err
+ }
+ options = append(options,
grpc.WithTransportCredentials(credentials.NewTLS(configTLS)))
+ } else {
+ options = append(options, grpc.WithInsecure())
+ }
+
+ var authHeader metadata.MD
+ if c.Authentication != "" {
+ authHeader = metadata.New(map[string]string{"Authentication":
c.Authentication})
+ }
+
+ // append auth or report error
+ options = append(options, grpc.WithStreamInterceptor(func(ctx
context.Context, desc *grpc.StreamDesc,
+ cc *grpc.ClientConn, method string, streamer grpc.Streamer,
opts ...grpc.CallOption) (grpc.ClientStream, error) {
+ if authHeader != nil {
+ ctx = metadata.NewOutgoingContext(ctx, authHeader)
+ }
+ clientStream, err := streamer(ctx, desc, cc, method, opts...)
+ if err != nil {
+ c.reportError(err)
+ }
+ return clientStream, err
+ }))
+ grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req,
reply interface{},
+ cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts
...grpc.CallOption) error {
+ if authHeader != nil {
+ ctx = metadata.NewOutgoingContext(ctx, authHeader)
+ }
+ err := invoker(ctx, method, req, reply, cc, opts...)
+ if err != nil {
+ c.reportError(err)
+ }
+ return err
+ })
+
+ return &options, nil
+}
+
+// configTLS loads and parse the TLS configs.
+func (c *Client) configTLS() (tc *tls.Config, tlsErr error) {
+ if err := checkTLSFile(c.CaPemPath); err != nil {
+ return nil, err
+ }
+ if err := checkTLSFile(c.ClientKeyPath); err != nil {
+ return nil, err
+ }
+ if err := checkTLSFile(c.ClientPemPath); err != nil {
+ return nil, err
+ }
+ tlsConfig := new(tls.Config)
+ tlsConfig.Renegotiation = tls.RenegotiateNever
+ tlsConfig.InsecureSkipVerify = c.InsecureSkipVerify
+ caPem, err := ioutil.ReadFile(c.CaPemPath)
+ if err != nil {
+ return nil, err
+ }
+ certPool := x509.NewCertPool()
+ certPool.AppendCertsFromPEM(caPem)
+ tlsConfig.RootCAs = certPool
+
+ clientPem, err := tls.LoadX509KeyPair(c.ClientPemPath, c.ClientKeyPath)
+ if err != nil {
+ return nil, err
+ }
+ tlsConfig.Certificates = []tls.Certificate{clientPem}
+ return tlsConfig, nil
+}
+
+// checkTLSFile checks the TLS files.
+func checkTLSFile(path string) error {
+ file, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+ stat, err := file.Stat()
+ if err != nil {
+ return err
+ }
+ if stat.Size() == 0 {
+ return fmt.Errorf("the TLS file is illegal: %s", path)
+ }
+ return nil
+}
diff --git a/plugins/client/grpc/client_sniffer.go
b/plugins/client/grpc/client_sniffer.go
new file mode 100644
index 0000000..2865c7c
--- /dev/null
+++ b/plugins/client/grpc/client_sniffer.go
@@ -0,0 +1,73 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "context"
+ "time"
+
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/status"
+
+ "github.com/apache/skywalking-satellite/plugins/client/api"
+)
+
+// sniffer
+func (c *Client) snifferChannelStatus() {
+ ctx, cancel := context.WithCancel(c.ctx)
+ defer cancel()
+ timeTicker := time.NewTicker(time.Duration(c.CheckPeriod) * time.Second)
+ for {
+ select {
+ case <-timeTicker.C:
+ state := c.client.GetState()
+ if state == connectivity.Shutdown || state ==
connectivity.TransientFailure {
+ c.updateStatus(api.Disconnect)
+ } else if state == connectivity.Ready || state ==
connectivity.Idle {
+ c.updateStatus(api.Connected)
+ }
+ case <-ctx.Done():
+ timeTicker.Stop()
+ return
+ }
+ }
+}
+
+func (c *Client) reportError(err error) {
+ if err == nil {
+ return
+ }
+ fromError, ok := status.FromError(err)
+ if ok {
+ errCode := fromError.Code()
+ if errCode == codes.Unavailable || errCode ==
codes.PermissionDenied ||
+ errCode == codes.Unauthenticated || errCode ==
codes.ResourceExhausted || errCode == codes.Unknown {
+ c.updateStatus(api.Disconnect)
+ }
+ }
+}
+
+func (c *Client) updateStatus(clientStatus api.ClientStatus) {
+ if c.status != clientStatus {
+ c.status = clientStatus
+ for _, listener := range c.listeners {
+ listener <- c.status
+ }
+ }
+}
diff --git a/plugins/forwarder/forwarder_repository.go
b/plugins/forwarder/forwarder_repository.go
index f69acdd..87f9dc7 100644
--- a/plugins/forwarder/forwarder_repository.go
+++ b/plugins/forwarder/forwarder_repository.go
@@ -20,9 +20,11 @@ package forwarder
import (
"reflect"
+ grpc_nativelog
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativelog"
+ kafka_nativelog
"github.com/apache/skywalking-satellite/plugins/forwarder/kafka/nativelog"
+
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
"github.com/apache/skywalking-satellite/plugins/forwarder/api"
-
"github.com/apache/skywalking-satellite/plugins/forwarder/kafka/nativelog"
)
// RegisterForwarderPlugins register the used filter plugins.
@@ -30,7 +32,8 @@ func RegisterForwarderPlugins() {
plugin.RegisterPluginCategory(reflect.TypeOf((*api.Forwarder)(nil)).Elem())
forwarders := []api.Forwarder{
// Please register the forwarder plugins at here.
- new(nativelog.Forwarder),
+ new(kafka_nativelog.Forwarder),
+ new(grpc_nativelog.Forwarder),
}
for _, forwarder := range forwarders {
plugin.RegisterPlugin(forwarder)
diff --git a/plugins/forwarder/grpc/nativelog/sync_forwarder.go
b/plugins/forwarder/grpc/nativelog/sync_forwarder.go
new file mode 100644
index 0000000..0daea19
--- /dev/null
+++ b/plugins/forwarder/grpc/nativelog/sync_forwarder.go
@@ -0,0 +1,101 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativelog
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "reflect"
+
+
"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+
+ "google.golang.org/grpc"
+
+ loggingv3 "skywalking/network/logging/v3"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/config"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/internal/satellite/event"
+)
+
+const Name = "nativelog-grpc-forwarder"
+
+type Forwarder struct {
+ config.CommonFields
+
+ logClient loggingv3.LogReportServiceClient
+}
+
+func (f *Forwarder) Name() string {
+ return Name
+}
+
+func (f *Forwarder) Description() string {
+ return "This is a synchronization grpc forwarder with the SkyWalking
native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+ return ``
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+ client, ok := connection.(*grpc.ClientConn)
+ if !ok {
+ return fmt.Errorf("the %s is only accepet the grpc client, but
receive a %s",
+ f.Name(), reflect.TypeOf(connection).String())
+ }
+ f.logClient = loggingv3.NewLogReportServiceClient(client)
+ return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+ stream, err := f.logClient.Collect(context.Background())
+ if err != nil {
+ log.Logger.Errorf("open grpc stream error %v", err)
+ return err
+ }
+ for _, e := range batch {
+ data, ok := e.GetData().(*protocol.Event_Log)
+ if !ok {
+ continue
+ }
+ err := stream.Send(data.Log)
+ if err != nil {
+ log.Logger.Errorf("%s send log data error: %v",
f.Name(), err)
+ err = closeStream(stream)
+ if err != nil {
+ log.Logger.Errorf("%s close stream error: %v",
f.Name(), err)
+ }
+ return err
+ }
+ }
+ return closeStream(stream)
+}
+
+func closeStream(stream loggingv3.LogReportService_CollectClient) error {
+ _, err := stream.CloseAndRecv()
+ if err != nil && err != io.EOF {
+ return err
+ }
+ return nil
+}
+
+func (f *Forwarder) ForwardType() protocol.EventType {
+ return protocol.EventType_Logging
+}