This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new 4278acb  Support gRPC client and transmit native log (#44)
4278acb is described below

commit 4278acb89565466d5812ab00b231493be2c35503
Author: mrproliu <[email protected]>
AuthorDate: Thu Jun 3 21:07:59 2021 +0800

    Support gRPC client and transmit native log (#44)
    
    Co-authored-by: dalekliuhan <[email protected]>
---
 CHANGES.md                                         |   4 +-
 configs/satellite_config.yaml                      |  28 ++--
 docs/en/setup/plugins/client_grpc-client.md        |  29 +++++
 .../plugins/forwarder_nativelog-grpc-forwarder.md  |   5 +
 docs/en/setup/plugins/plugin-list.md               |   2 +
 internal/satellite/config/loader_test.go           | 141 ++++++++++----------
 plugins/client/client_repository.go                |   3 +
 plugins/client/grpc/client.go                      | 142 +++++++++++++++++++++
 plugins/client/grpc/client_config.go               | 123 ++++++++++++++++++
 plugins/client/grpc/client_sniffer.go              |  73 +++++++++++
 plugins/forwarder/forwarder_repository.go          |   7 +-
 plugins/forwarder/grpc/nativelog/sync_forwarder.go | 101 +++++++++++++++
 12 files changed, 576 insertions(+), 82 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 96ea4f7..e599503 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -5,7 +5,9 @@ Release Notes.
 0.2.0
 ------------------
 #### Features
-Update protoc-gen-go version to 1.26.0.
+* Update protoc-gen-go version to 1.26.0.
+* Add grpc client plugin.
+* Add nativelog-grpc-forwarder plugin.
 
 #### Bug Fixes
 Fix the data race in mmap queue.
diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index b79e614..8eea955 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -36,19 +36,23 @@ telemetry:
 # The sharing plugins referenced by the specific plugins in the different 
pipes.
 sharing:
   clients:
-    - plugin_name: "kafka-client"
-      # The Kafka broker addresses (default localhost:9092). Multiple values 
are separated by commas.
-      brokers: ${SATELLITE_KAFKA_CLIENT_BROKERS:127.0.0.1:9092}
-      # The Kakfa version should follow this pattern, which is 
major_minor_veryMinor_patch.
-      version: ${SATELLITE_KAFKA_VERSION:"2.1.1"}
+    - plugin_name: "grpc-client"
+      # The gRPC server address (default localhost:11800).
+      server_addr: ${SATELLITE_GRPC_CLIENT:127.0.0.1:11800}
       # The TLS switch
-      enable_TLS: ${SATELLITE_KAFKA_ENABLE_TLS:false}
+      enable_TLS: ${SATELLITE_GRPC_ENABLE_TLS:false}
       # The file path of client.pem. The config only works when opening the 
TLS switch.
-      client_pem_path: ${SATELLITE_KAFKA_CLIENT_PEM_PATH:"client.pem"}
+      client_pem_path: ${SATELLITE_GRPC_CLIENT_PEM_PATH:"client.pem"}
       # The file path of client.key. The config only works when opening the 
TLS switch.
-      client_key_path: ${SATELLITE_KAFKA_CLIENT_KEY_PATH:"client.key"}
+      client_key_path: ${SATELLITE_GRPC_CLIENT_KEY_PATH:"client.key"}
+      # InsecureSkipVerify controls whether a client verifies the server's 
certificate chain and host name.
+      insecure_skip_verify: ${SATELLITE_GRPC_INSECURE_SKIP_VERIFY:false}
       # The file path oca.pem. The config only works when opening the TLS 
switch.
-      ca_pem_path: ${SATELLITE_KAFKA_CA_PEM_PATH:"ca.pem"}
+      ca_pem_path: ${SATELLITE_grpc_CA_PEM_PATH:"ca.pem"}
+      # How frequently to check the connection
+      check_period: ${SATELLITE_GRPC_CHECK_PERIOD:5}
+      # The auth value when send request
+      authentication: ${SATELLITE_GRPC_AUTHENTICATION:""}
   servers:
     - plugin_name: "grpc-server"
       # The address of grpc server.
@@ -86,8 +90,6 @@ pipes:
       max_buffer_size: ${SATELLITE_LOGPIPE_SENDER_MAX_BUFFER_SIZE:200}
       # The minimum flush elements.
       min_flush_events: ${SATELLITE_LOGPIPE_SENDER_MIN_FLUSH_EVENTS:100}
-      client_name: kafka-client
+      client_name: grpc-client
       forwarders:
-        - plugin_name: nativelog-kafka-forwarder
-          # The remote kafka topic.
-          topic: ${SATELLITE_NATIVELOG-TOPIC:log-topic}
+        - plugin_name: nativelog-grpc-forwarder
diff --git a/docs/en/setup/plugins/client_grpc-client.md 
b/docs/en/setup/plugins/client_grpc-client.md
new file mode 100755
index 0000000..27ada86
--- /dev/null
+++ b/docs/en/setup/plugins/client_grpc-client.md
@@ -0,0 +1,29 @@
+# Client/grpc-client
+## Description
+The gRPC client is a sharing plugin to keep connection with the gRPC server 
and delivery the data to it.
+## DefaultConfig
+```yaml
+# The gRPC server address (default localhost:11800). 
+server_addr: localhost:11800
+
+# The TLS switch (default false).
+enable_TLS: false
+
+# The file path of client.pem. The config only works when opening the TLS 
switch.
+client_pem_path: ""
+
+# The file path of client.key. The config only works when opening the TLS 
switch.
+client_key_path: ""
+
+# The file path oca.pem. The config only works when opening the TLS switch.
+ca_pem_path: ""
+
+# InsecureSkipVerify controls whether a client verifies the server's 
certificate chain and host name.
+insecure_skip_verify: true
+
+# The auth value when send request
+authentication: ""
+
+# How frequently to check the connection
+check_period: 5
+```
diff --git a/docs/en/setup/plugins/forwarder_nativelog-grpc-forwarder.md 
b/docs/en/setup/plugins/forwarder_nativelog-grpc-forwarder.md
new file mode 100755
index 0000000..14350ed
--- /dev/null
+++ b/docs/en/setup/plugins/forwarder_nativelog-grpc-forwarder.md
@@ -0,0 +1,5 @@
+# Forwarder/nativelog-grpc-forwarder
+## Description
+This is a synchronization grpc forwarder with the SkyWalking native log 
protocol.
+## DefaultConfig
+```yaml```
diff --git a/docs/en/setup/plugins/plugin-list.md 
b/docs/en/setup/plugins/plugin-list.md
index e200cda..6760a9e 100755
--- a/docs/en/setup/plugins/plugin-list.md
+++ b/docs/en/setup/plugins/plugin-list.md
@@ -1,5 +1,6 @@
 # Plugin List
 - Client
+       - [grpc-client](./client_grpc-client.md)
        - [kafka-client](./client_kafka-client.md)
 - Fallbacker
        - [none-fallbacker](./fallbacker_none-fallbacker.md)
@@ -7,6 +8,7 @@
 - Fetcher
 - Filter
 - Forwarder
+       - [nativelog-grpc-forwarder](./forwarder_nativelog-grpc-forwarder.md)
        - [nativelog-kafka-forwarder](./forwarder_nativelog-kafka-forwarder.md)
 - Parser
 - Queue
diff --git a/internal/satellite/config/loader_test.go 
b/internal/satellite/config/loader_test.go
index 9712998..002a606 100644
--- a/internal/satellite/config/loader_test.go
+++ b/internal/satellite/config/loader_test.go
@@ -75,82 +75,91 @@ func params() *SatelliteConfig {
                        Service:  "service1",
                        Instance: "instance1",
                },
-               Sharing: &SharingConfig{
-                       SharingCommonConfig: &config.CommonFields{
-                               PipeName: "sharing",
+               Sharing: sharing(),
+               Pipes:   pipes(),
+       }
+}
+
+func sharing() *SharingConfig {
+       return &SharingConfig{
+               SharingCommonConfig: &config.CommonFields{
+                       PipeName: "sharing",
+               },
+               Clients: []plugin.Config{
+                       {
+                               "plugin_name":            "grpc-client",
+                               "server_addr":            "127.0.0.1:11800",
+                               "commonfields_pipe_name": "sharing",
+                               "ca_pem_path":            "ca.pem",
+                               "client_key_path":        "client.key",
+                               "client_pem_path":        "client.pem",
+                               "enable_TLS":             false,
+                               "insecure_skip_verify":   false,
+                               "check_period":           5,
+                               "authentication":         "",
                        },
-                       Clients: []plugin.Config{
-                               {
-                                       "plugin_name":            
"kafka-client",
-                                       "brokers":                
"127.0.0.1:9092",
-                                       "version":                "2.1.1",
-                                       "commonfields_pipe_name": "sharing",
-                                       "ca_pem_path":            "ca.pem",
-                                       "client_key_path":        "client.key",
-                                       "client_pem_path":        "client.pem",
-                                       "enable_TLS":             false,
-                               },
+               },
+               Servers: []plugin.Config{
+                       {
+                               "plugin_name":            "grpc-server",
+                               "commonfields_pipe_name": "sharing",
+                               "address":                ":11800",
+                               "tls_cert_file":          "",
+                               "tls_key_file":           "",
+                       },
+                       {
+                               "plugin_name":            "prometheus-server",
+                               "address":                ":1234",
+                               "commonfields_pipe_name": "sharing",
+                               "endpoint":               "/metrics",
+                       },
+               },
+       }
+}
+
+func pipes() []*PipeConfig {
+       return []*PipeConfig{
+               {
+                       PipeCommonConfig: &config.CommonFields{
+                               PipeName: "logpipe",
                        },
-                       Servers: []plugin.Config{
-                               {
-                                       "plugin_name":            "grpc-server",
-                                       "commonfields_pipe_name": "sharing",
-                                       "address":                ":11800",
-                                       "tls_cert_file":          "",
-                                       "tls_key_file":           "",
+
+                       Gatherer: &gatherer.GathererConfig{
+                               ServerName: "grpc-server",
+                               CommonFields: &config.CommonFields{
+                                       PipeName: "logpipe",
                                },
-                               {
-                                       "plugin_name":            
"prometheus-server",
-                                       "address":                ":1234",
-                                       "commonfields_pipe_name": "sharing",
-                                       "endpoint":               "/metrics",
+                               ReceiverConfig: plugin.Config{
+                                       "plugin_name":            
"grpc-nativelog-receiver",
+                                       "commonfields_pipe_name": "logpipe",
+                               },
+                               QueueConfig: plugin.Config{
+                                       "commonfields_pipe_name": "logpipe",
+                                       "plugin_name":            
"memory-queue",
+                                       "event_buffer_size":      5000,
                                },
                        },
-               },
-               Pipes: []*PipeConfig{
-                       {
-                               PipeCommonConfig: &config.CommonFields{
+                       Processor: &processor.ProcessorConfig{
+                               CommonFields: &config.CommonFields{
                                        PipeName: "logpipe",
                                },
-
-                               Gatherer: &gatherer.GathererConfig{
-                                       ServerName: "grpc-server",
-                                       CommonFields: &config.CommonFields{
-                                               PipeName: "logpipe",
-                                       },
-                                       ReceiverConfig: plugin.Config{
-                                               "plugin_name":            
"grpc-nativelog-receiver",
-                                               "commonfields_pipe_name": 
"logpipe",
-                                       },
-                                       QueueConfig: plugin.Config{
-                                               "commonfields_pipe_name": 
"logpipe",
-                                               "plugin_name":            
"memory-queue",
-                                               "event_buffer_size":      5000,
-                                       },
+                       },
+                       Sender: &sender.SenderConfig{
+                               CommonFields: &config.CommonFields{
+                                       PipeName: "logpipe",
                                },
-                               Processor: &processor.ProcessorConfig{
-                                       CommonFields: &config.CommonFields{
-                                               PipeName: "logpipe",
-                                       },
+                               FallbackerConfig: plugin.Config{
+                                       "commonfields_pipe_name": "logpipe",
+                                       "plugin_name":            
"none-fallbacker",
                                },
-                               Sender: &sender.SenderConfig{
-                                       CommonFields: &config.CommonFields{
-                                               PipeName: "logpipe",
-                                       },
-                                       FallbackerConfig: plugin.Config{
+                               FlushTime:      1000,
+                               MaxBufferSize:  200,
+                               MinFlushEvents: 100,
+                               ClientName:     "grpc-client",
+                               ForwardersConfig: []plugin.Config{
+                                       {
+                                               "plugin_name":            
"nativelog-grpc-forwarder",
                                                "commonfields_pipe_name": 
"logpipe",
-                                               "plugin_name":            
"none-fallbacker",
-                                       },
-                                       FlushTime:      1000,
-                                       MaxBufferSize:  200,
-                                       MinFlushEvents: 100,
-                                       ClientName:     "kafka-client",
-                                       ForwardersConfig: []plugin.Config{
-                                               {
-                                                       "plugin_name":          
  "nativelog-kafka-forwarder",
-                                                       "topic":                
  "log-topic",
-                                                       
"commonfields_pipe_name": "logpipe",
-                                               },
                                        },
                                },
                        },
diff --git a/plugins/client/client_repository.go 
b/plugins/client/client_repository.go
index 1300b37..3b17417 100644
--- a/plugins/client/client_repository.go
+++ b/plugins/client/client_repository.go
@@ -20,6 +20,8 @@ package client
 import (
        "reflect"
 
+       "github.com/apache/skywalking-satellite/plugins/client/grpc"
+
        "github.com/apache/skywalking-satellite/internal/pkg/plugin"
        "github.com/apache/skywalking-satellite/plugins/client/api"
        "github.com/apache/skywalking-satellite/plugins/client/kafka"
@@ -31,6 +33,7 @@ func RegisterClientPlugins() {
        clients := []api.Client{
                // Please register the client plugins at here.
                new(kafka.Client),
+               new(grpc.Client),
        }
        for _, client := range clients {
                plugin.RegisterPlugin(client)
diff --git a/plugins/client/grpc/client.go b/plugins/client/grpc/client.go
new file mode 100644
index 0000000..b8598c9
--- /dev/null
+++ b/plugins/client/grpc/client.go
@@ -0,0 +1,142 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "fmt"
+
+       "github.com/sirupsen/logrus"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/grpclog"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/config"
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+       "github.com/apache/skywalking-satellite/plugins/client/api"
+)
+
+const Name = "grpc-client"
+
+type Client struct {
+       config.CommonFields
+       // config
+       ServerAddr         string `mapstructure:"server_addr"`          // The 
gRPC server address
+       EnableTLS          bool   `mapstructure:"enable_TLS"`           // 
Enable TLS connect to server
+       ClientPemPath      string `mapstructure:"client_pem_path"`      // The 
file path of client.pem. The config only works when opening the TLS switch.
+       ClientKeyPath      string `mapstructure:"client_key_path"`      // The 
file path of client.key. The config only works when opening the TLS switch.
+       CaPemPath          string `mapstructure:"ca_pem_path"`          // The 
file path oca.pem. The config only works when opening the TLS switch.
+       InsecureSkipVerify bool   `mapstructure:"insecure_skip_verify"` // 
Controls whether a client verifies the server's certificate chain and host name.
+       Authentication     string `mapstructure:"authentication"`       // The 
auth value when send request
+       CheckPeriod        int    `mapstructure:"check_period"`         // How 
frequently to check the connection
+
+       // components
+       status    api.ClientStatus
+       client    *grpc.ClientConn
+       listeners []chan<- api.ClientStatus
+       ctx       context.Context    // Parent ctx
+       cancel    context.CancelFunc // Parent ctx cancel function
+}
+
+func (c *Client) Name() string {
+       return Name
+}
+
+func (c *Client) Description() string {
+       return "The gRPC client is a sharing plugin to keep connection with the 
gRPC server and delivery the data to it."
+}
+
+func (c *Client) DefaultConfig() string {
+       return `
+# The gRPC server address (default localhost:11800). 
+server_addr: localhost:11800
+
+# The TLS switch (default false).
+enable_TLS: false
+
+# The file path of client.pem. The config only works when opening the TLS 
switch.
+client_pem_path: ""
+
+# The file path of client.key. The config only works when opening the TLS 
switch.
+client_key_path: ""
+
+# The file path oca.pem. The config only works when opening the TLS switch.
+ca_pem_path: ""
+
+# InsecureSkipVerify controls whether a client verifies the server's 
certificate chain and host name.
+insecure_skip_verify: true
+
+# The auth value when send request
+authentication: ""
+
+# How frequently to check the connection
+check_period: 5
+`
+}
+
+func (c *Client) Prepare() error {
+       // config
+       cfg, err := c.loadConfig()
+       if err != nil {
+               return fmt.Errorf("cannot init the grpc client: %v", err)
+       }
+
+       // logger
+       
grpclog.SetLoggerV2(&logrusGrpcLoggerV2{log.Logger.WithFields(logrus.Fields{
+               "client_name": Name,
+       })})
+
+       // connect to server
+       client, err := grpc.Dial(c.ServerAddr, *cfg...)
+       if err != nil {
+               return fmt.Errorf("cannot connect to grpc server: %v", err)
+       }
+
+       c.client = client
+       c.status = api.Connected
+       c.ctx, c.cancel = context.WithCancel(context.Background())
+       c.listeners = make([]chan<- api.ClientStatus, 0)
+       return nil
+}
+
+func (c *Client) Close() error {
+       c.cancel()
+       defer log.Logger.Info("grpc client is closed")
+       return c.client.Close()
+}
+
+func (c *Client) GetConnectedClient() interface{} {
+       return c.client
+}
+
+func (c *Client) RegisterListener(listener chan<- api.ClientStatus) {
+       c.listeners = append(c.listeners, listener)
+}
+
+func (c *Client) Start() error {
+       go c.snifferChannelStatus()
+       return nil
+}
+
+// grpc log adaptor
+type logrusGrpcLoggerV2 struct {
+       *logrus.Entry
+}
+
+func (l *logrusGrpcLoggerV2) V(level int) bool {
+       return l.Logger.IsLevelEnabled(logrus.Level(level))
+}
diff --git a/plugins/client/grpc/client_config.go 
b/plugins/client/grpc/client_config.go
new file mode 100644
index 0000000..5f7d49b
--- /dev/null
+++ b/plugins/client/grpc/client_config.go
@@ -0,0 +1,123 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "crypto/tls"
+       "crypto/x509"
+       "fmt"
+       "io/ioutil"
+       "os"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/metadata"
+)
+
+// loadConfig use the client params to build the grpc client config.
+func (c *Client) loadConfig() (*[]grpc.DialOption, error) {
+       options := make([]grpc.DialOption, 0)
+
+       if c.EnableTLS {
+               configTLS, err := c.configTLS()
+               if err != nil {
+                       return nil, err
+               }
+               options = append(options, 
grpc.WithTransportCredentials(credentials.NewTLS(configTLS)))
+       } else {
+               options = append(options, grpc.WithInsecure())
+       }
+
+       var authHeader metadata.MD
+       if c.Authentication != "" {
+               authHeader = metadata.New(map[string]string{"Authentication": 
c.Authentication})
+       }
+
+       // append auth or report error
+       options = append(options, grpc.WithStreamInterceptor(func(ctx 
context.Context, desc *grpc.StreamDesc,
+               cc *grpc.ClientConn, method string, streamer grpc.Streamer, 
opts ...grpc.CallOption) (grpc.ClientStream, error) {
+               if authHeader != nil {
+                       ctx = metadata.NewOutgoingContext(ctx, authHeader)
+               }
+               clientStream, err := streamer(ctx, desc, cc, method, opts...)
+               if err != nil {
+                       c.reportError(err)
+               }
+               return clientStream, err
+       }))
+       grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, 
reply interface{},
+               cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts 
...grpc.CallOption) error {
+               if authHeader != nil {
+                       ctx = metadata.NewOutgoingContext(ctx, authHeader)
+               }
+               err := invoker(ctx, method, req, reply, cc, opts...)
+               if err != nil {
+                       c.reportError(err)
+               }
+               return err
+       })
+
+       return &options, nil
+}
+
+// configTLS loads and parse the TLS configs.
+func (c *Client) configTLS() (tc *tls.Config, tlsErr error) {
+       if err := checkTLSFile(c.CaPemPath); err != nil {
+               return nil, err
+       }
+       if err := checkTLSFile(c.ClientKeyPath); err != nil {
+               return nil, err
+       }
+       if err := checkTLSFile(c.ClientPemPath); err != nil {
+               return nil, err
+       }
+       tlsConfig := new(tls.Config)
+       tlsConfig.Renegotiation = tls.RenegotiateNever
+       tlsConfig.InsecureSkipVerify = c.InsecureSkipVerify
+       caPem, err := ioutil.ReadFile(c.CaPemPath)
+       if err != nil {
+               return nil, err
+       }
+       certPool := x509.NewCertPool()
+       certPool.AppendCertsFromPEM(caPem)
+       tlsConfig.RootCAs = certPool
+
+       clientPem, err := tls.LoadX509KeyPair(c.ClientPemPath, c.ClientKeyPath)
+       if err != nil {
+               return nil, err
+       }
+       tlsConfig.Certificates = []tls.Certificate{clientPem}
+       return tlsConfig, nil
+}
+
+// checkTLSFile checks the TLS files.
+func checkTLSFile(path string) error {
+       file, err := os.Open(path)
+       if err != nil {
+               return err
+       }
+       stat, err := file.Stat()
+       if err != nil {
+               return err
+       }
+       if stat.Size() == 0 {
+               return fmt.Errorf("the TLS file is illegal: %s", path)
+       }
+       return nil
+}
diff --git a/plugins/client/grpc/client_sniffer.go 
b/plugins/client/grpc/client_sniffer.go
new file mode 100644
index 0000000..2865c7c
--- /dev/null
+++ b/plugins/client/grpc/client_sniffer.go
@@ -0,0 +1,73 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/connectivity"
+       "google.golang.org/grpc/status"
+
+       "github.com/apache/skywalking-satellite/plugins/client/api"
+)
+
+// sniffer
+func (c *Client) snifferChannelStatus() {
+       ctx, cancel := context.WithCancel(c.ctx)
+       defer cancel()
+       timeTicker := time.NewTicker(time.Duration(c.CheckPeriod) * time.Second)
+       for {
+               select {
+               case <-timeTicker.C:
+                       state := c.client.GetState()
+                       if state == connectivity.Shutdown || state == 
connectivity.TransientFailure {
+                               c.updateStatus(api.Disconnect)
+                       } else if state == connectivity.Ready || state == 
connectivity.Idle {
+                               c.updateStatus(api.Connected)
+                       }
+               case <-ctx.Done():
+                       timeTicker.Stop()
+                       return
+               }
+       }
+}
+
+func (c *Client) reportError(err error) {
+       if err == nil {
+               return
+       }
+       fromError, ok := status.FromError(err)
+       if ok {
+               errCode := fromError.Code()
+               if errCode == codes.Unavailable || errCode == 
codes.PermissionDenied ||
+                       errCode == codes.Unauthenticated || errCode == 
codes.ResourceExhausted || errCode == codes.Unknown {
+                       c.updateStatus(api.Disconnect)
+               }
+       }
+}
+
+func (c *Client) updateStatus(clientStatus api.ClientStatus) {
+       if c.status != clientStatus {
+               c.status = clientStatus
+               for _, listener := range c.listeners {
+                       listener <- c.status
+               }
+       }
+}
diff --git a/plugins/forwarder/forwarder_repository.go 
b/plugins/forwarder/forwarder_repository.go
index f69acdd..87f9dc7 100644
--- a/plugins/forwarder/forwarder_repository.go
+++ b/plugins/forwarder/forwarder_repository.go
@@ -20,9 +20,11 @@ package forwarder
 import (
        "reflect"
 
+       grpc_nativelog 
"github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativelog"
+       kafka_nativelog 
"github.com/apache/skywalking-satellite/plugins/forwarder/kafka/nativelog"
+
        "github.com/apache/skywalking-satellite/internal/pkg/plugin"
        "github.com/apache/skywalking-satellite/plugins/forwarder/api"
-       
"github.com/apache/skywalking-satellite/plugins/forwarder/kafka/nativelog"
 )
 
 // RegisterForwarderPlugins register the used filter plugins.
@@ -30,7 +32,8 @@ func RegisterForwarderPlugins() {
        
plugin.RegisterPluginCategory(reflect.TypeOf((*api.Forwarder)(nil)).Elem())
        forwarders := []api.Forwarder{
                // Please register the forwarder plugins at here.
-               new(nativelog.Forwarder),
+               new(kafka_nativelog.Forwarder),
+               new(grpc_nativelog.Forwarder),
        }
        for _, forwarder := range forwarders {
                plugin.RegisterPlugin(forwarder)
diff --git a/plugins/forwarder/grpc/nativelog/sync_forwarder.go 
b/plugins/forwarder/grpc/nativelog/sync_forwarder.go
new file mode 100644
index 0000000..0daea19
--- /dev/null
+++ b/plugins/forwarder/grpc/nativelog/sync_forwarder.go
@@ -0,0 +1,101 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package nativelog
+
+import (
+       "context"
+       "fmt"
+       "io"
+       "reflect"
+
+       
"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+
+       "google.golang.org/grpc"
+
+       loggingv3 "skywalking/network/logging/v3"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/config"
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+       "github.com/apache/skywalking-satellite/internal/satellite/event"
+)
+
+const Name = "nativelog-grpc-forwarder"
+
+type Forwarder struct {
+       config.CommonFields
+
+       logClient loggingv3.LogReportServiceClient
+}
+
+func (f *Forwarder) Name() string {
+       return Name
+}
+
+func (f *Forwarder) Description() string {
+       return "This is a synchronization grpc forwarder with the SkyWalking 
native log protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+       return ``
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+       client, ok := connection.(*grpc.ClientConn)
+       if !ok {
+               return fmt.Errorf("the %s is only accepet the grpc client, but 
receive a %s",
+                       f.Name(), reflect.TypeOf(connection).String())
+       }
+       f.logClient = loggingv3.NewLogReportServiceClient(client)
+       return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+       stream, err := f.logClient.Collect(context.Background())
+       if err != nil {
+               log.Logger.Errorf("open grpc stream error %v", err)
+               return err
+       }
+       for _, e := range batch {
+               data, ok := e.GetData().(*protocol.Event_Log)
+               if !ok {
+                       continue
+               }
+               err := stream.Send(data.Log)
+               if err != nil {
+                       log.Logger.Errorf("%s send log data error: %v", 
f.Name(), err)
+                       err = closeStream(stream)
+                       if err != nil {
+                               log.Logger.Errorf("%s close stream error: %v", 
f.Name(), err)
+                       }
+                       return err
+               }
+       }
+       return closeStream(stream)
+}
+
+func closeStream(stream loggingv3.LogReportService_CollectClient) error {
+       _, err := stream.CloseAndRecv()
+       if err != nil && err != io.EOF {
+               return err
+       }
+       return nil
+}
+
+func (f *Forwarder) ForwardType() protocol.EventType {
+       return protocol.EventType_Logging
+}

Reply via email to