This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new 18f556c  feat: add http logging receiver (#16)
18f556c is described below

commit 18f556c852800fa722ce17e3073cf328bdccfa38
Author: kv <gxt...@163.com>
AuthorDate: Thu Jan 21 08:35:43 2021 +0800

    feat: add http logging receiver (#16)
---
 go.sum                                 |   2 +
 plugins/queue/mmap/queue.go            |   1 -
 plugins/receiver/http/receiver.go      | 121 +++++++++++++++++++++
 plugins/receiver/http/receiver_test.go | 185 +++++++++++++++++++++++++++++++++
 plugins/server/http/server.go          |  71 +++++++++++++
 5 files changed, 379 insertions(+), 1 deletion(-)

diff --git a/go.sum b/go.sum
index b4f00e9..9826328 100644
--- a/go.sum
+++ b/go.sum
@@ -202,6 +202,7 @@ github.com/hashicorp/memberlist v0.1.3/go.mod 
h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p
 github.com/hashicorp/serf v0.8.2/go.mod 
h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
 github.com/hpcloud/tail v1.0.0/go.mod 
h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/hudl/fargo v1.3.0/go.mod 
h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
+github.com/inconshreveable/mousetrap v1.0.0 
h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod 
h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
 github.com/influxdata/influxdb1-client 
v0.0.0-20191209144304-8bf82d3c094d/go.mod 
h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
 github.com/jcmturner/gofork v1.0.0 
h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8=
@@ -481,6 +482,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod 
h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e 
h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go
index 22b76b0..46aecd6 100644
--- a/plugins/queue/mmap/queue.go
+++ b/plugins/queue/mmap/queue.go
@@ -28,7 +28,6 @@ import (
        "time"
 
        "github.com/grandecola/mmap"
-
        "google.golang.org/protobuf/proto"
 
        "github.com/apache/skywalking-satellite/internal/pkg/config"
diff --git a/plugins/receiver/http/receiver.go 
b/plugins/receiver/http/receiver.go
new file mode 100644
index 0000000..20fbc22
--- /dev/null
+++ b/plugins/receiver/http/receiver.go
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package http
+
+import (
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "time"
+
+       logging "skywalking/network/logging/v3"
+
+       "google.golang.org/protobuf/proto"
+
+       "encoding/json"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/config"
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+       http_server "github.com/apache/skywalking-satellite/plugins/server/http"
+       
"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+)
+
+const (
+       Name      = "http-log-receiver"
+       eventName = "http-log-event"
+       timeout   = 5 * time.Second
+       Success   = "success"
+       Failing   = "failing"
+)
+
+type Receiver struct {
+       config.CommonFields
+       Server        *http_server.Server
+       OutputChannel chan *protocol.Event
+       URI           string `mapstructure:"uri"`
+}
+
+type Response struct {
+       Status string `json:"status"`
+       Msg    string `json:"msg"`
+}
+
+func (r *Receiver) Name() string {
+       return Name
+}
+
+func (r *Receiver) Description() string {
+       return "This is a receiver for SkyWalking http logging format, " +
+               "which is defined at 
https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto.";
+}
+
+func (r *Receiver) DefaultConfig() string {
+       return `
+# The http server uri .
+uri: "/logging"
+`
+}
+
+func (r *Receiver) RegisterHandler(server interface{}) {
+       r.Server = server.(*http_server.Server)
+       r.OutputChannel = make(chan *protocol.Event)
+       r.Server.Server.Handle(r.URI, httpHandler(r))
+}
+
+func ResponseWithJSON(rsp http.ResponseWriter, response *Response, code int) {
+       rsp.WriteHeader(code)
+       _ = json.NewEncoder(rsp).Encode(response)
+}
+
+func httpHandler(r *Receiver) http.Handler {
+       h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
+               rsp.Header().Set("Content-Type", "application/json")
+               b, err := ioutil.ReadAll(req.Body)
+               if err != nil {
+                       log.Logger.Errorf("get http body error: %v", err)
+                       response := &Response{Status: Failing, Msg: err.Error()}
+                       ResponseWithJSON(rsp, response, http.StatusBadRequest)
+                       return
+               }
+               var data logging.LogData
+               err = proto.Unmarshal(b, &data)
+               if err != nil {
+                       response := &Response{Status: Failing, Msg: err.Error()}
+                       ResponseWithJSON(rsp, response, 
http.StatusInternalServerError)
+                       return
+               }
+               e := &protocol.Event{
+                       Name:      eventName,
+                       Timestamp: time.Now().UnixNano() / 1e6,
+                       Meta:      nil,
+                       Type:      protocol.EventType_Logging,
+                       Remote:    true,
+                       Data: &protocol.Event_Log{
+                               Log: &data,
+                       },
+               }
+               r.OutputChannel <- e
+               response := &Response{Status: Success, Msg: Success}
+               ResponseWithJSON(rsp, response, http.StatusOK)
+       })
+       return http.TimeoutHandler(h, timeout, fmt.Sprintf("Exceeded configured 
timeout of %v \n", timeout))
+}
+
+func (r *Receiver) Channel() <-chan *protocol.Event {
+       return r.OutputChannel
+}
diff --git a/plugins/receiver/http/receiver_test.go 
b/plugins/receiver/http/receiver_test.go
new file mode 100644
index 0000000..a2ece41
--- /dev/null
+++ b/plugins/receiver/http/receiver_test.go
@@ -0,0 +1,185 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package http
+
+import (
+       "bytes"
+       "fmt"
+       "io/ioutil"
+       "net/http"
+       "reflect"
+       "strconv"
+       "testing"
+       "time"
+
+       common "skywalking/network/common/v3"
+       logging "skywalking/network/logging/v3"
+
+       "github.com/google/go-cmp/cmp"
+       "google.golang.org/protobuf/proto"
+
+       "encoding/json"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+       _ "github.com/apache/skywalking-satellite/internal/satellite/test"
+       receiver "github.com/apache/skywalking-satellite/plugins/receiver/api"
+       server "github.com/apache/skywalking-satellite/plugins/server/api"
+       httpserver "github.com/apache/skywalking-satellite/plugins/server/http"
+       
"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+)
+
+func TestReceiver_http_RegisterHandler(t *testing.T) {
+       Init()
+       r := initReceiver(make(plugin.Config), t)
+       s := initServer(make(plugin.Config), t)
+       r.RegisterHandler(s.GetServer())
+       err := s.Start()
+       if err != nil {
+               t.Fatalf(err.Error())
+       }
+       time.Sleep(time.Second)
+       defer func() {
+               if err := s.Close(); err != nil {
+                       t.Fatalf("cannot close the http sever: %v", err)
+               }
+       }()
+       for i := 0; i < 10; i++ {
+               data := initData(i)
+               dataBytes, err := proto.Marshal(data)
+               client := http.Client{Timeout: 5 * time.Second}
+               if err != nil {
+                       t.Fatalf("cannot marshal the data: %v", err)
+               }
+               go func() {
+                       resp, err := 
client.Post("http://localhost:8080/logging";, "application/json", 
bytes.NewBuffer(dataBytes))
+                       if err != nil {
+                               fmt.Printf("cannot request the http-server , 
error: %v", err)
+                       }
+                       defer resp.Body.Close()
+                       result, err := ioutil.ReadAll(resp.Body)
+                       if err != nil {
+                               fmt.Printf("cannot get response from request, 
error: %v ", err.Error())
+                       }
+                       var response Response
+                       _ = json.Unmarshal(result, &response)
+                       if !cmp.Equal(response.Status, Success) {
+                               panic("the response should be success, but 
failing")
+                       }
+               }()
+
+               newData := <-r.Channel()
+               if !cmp.Equal(newData.Data.(*protocol.Event_Log).Log.String(), 
data.String()) {
+                       t.Fatalf("the sent data is not equal to the received 
data\n, "+
+                               "want data %s\n, but got %s\n", data.String(), 
newData.String())
+               }
+       }
+}
+
+func TestReceiver_http_RegisterHandler_failed(t *testing.T) {
+       Init()
+       r := initReceiver(make(plugin.Config), t)
+       s := initServer(make(plugin.Config), t)
+       r.RegisterHandler(s.GetServer())
+       err := s.Start()
+       if err != nil {
+               t.Fatalf(err.Error())
+       }
+       time.Sleep(time.Second)
+       defer func() {
+               if err = s.Close(); err != nil {
+                       t.Fatalf("cannot close the http sever: %v", err)
+               }
+       }()
+       data := initData(0)
+       dataBytes, err := json.Marshal(data)
+       client := http.Client{Timeout: 5 * time.Second}
+       if err != nil {
+               t.Fatalf("cannot marshal the data: %v", err)
+       }
+       resp, err := client.Post("http://localhost:8080/logging";, 
"application/json", bytes.NewBuffer(dataBytes))
+       if err != nil {
+               fmt.Printf("cannot request the http-server , error: %v", err)
+       }
+       defer resp.Body.Close()
+       result, err := ioutil.ReadAll(resp.Body)
+       if err != nil {
+               fmt.Printf("cannot get response from request, error: %v ", 
err.Error())
+       }
+       var response Response
+       _ = json.Unmarshal(result, &response)
+       if !cmp.Equal(response.Status, Failing) {
+               panic("the response should be failing, but success")
+       }
+}
+
+func initData(sequence int) *logging.LogData {
+       seq := strconv.Itoa(sequence)
+       return &logging.LogData{
+               Timestamp:       time.Now().Unix(),
+               Service:         "demo-service" + seq,
+               ServiceInstance: "demo-instance" + seq,
+               Endpoint:        "demo-endpoint" + seq,
+               TraceContext: &logging.TraceContext{
+                       TraceSegmentId: "mock-segmentId" + seq,
+                       TraceId:        "mock-traceId" + seq,
+                       SpanId:         1,
+               },
+               Tags: []*common.KeyStringValuePair{
+                       {
+                               Key:   "mock-key" + seq,
+                               Value: "mock-value" + seq,
+                       },
+               },
+               Body: &logging.LogDataBody{
+                       Type: "mock-type" + seq,
+                       Content: &logging.LogDataBody_Text{
+                               Text: &logging.TextLog{
+                                       Text: "this is a mock text mock log" + 
seq,
+                               },
+                       },
+               },
+       }
+}
+
+func Init() {
+       
plugin.RegisterPluginCategory(reflect.TypeOf((*server.Server)(nil)).Elem())
+       
plugin.RegisterPluginCategory(reflect.TypeOf((*receiver.Receiver)(nil)).Elem())
+       plugin.RegisterPlugin(new(httpserver.Server))
+       plugin.RegisterPlugin(new(Receiver))
+}
+
+func initServer(cfg plugin.Config, t *testing.T) server.Server {
+       cfg[plugin.NameField] = httpserver.Name
+       q := server.GetServer(cfg)
+       if q == nil {
+               t.Fatalf("cannot get a http server from the registry")
+       }
+       if err := q.Prepare(); err != nil {
+               t.Fatalf("cannot perpare the http server: %v", err)
+       }
+       return q
+}
+
+func initReceiver(cfg plugin.Config, t *testing.T) receiver.Receiver {
+       cfg[plugin.NameField] = "http-log-receiver"
+       q := receiver.GetReceiver(cfg)
+       if q == nil {
+               t.Fatalf("cannot get http-log-receiver from the registry")
+       }
+       return q
+}
diff --git a/plugins/server/http/server.go b/plugins/server/http/server.go
new file mode 100644
index 0000000..7d93382
--- /dev/null
+++ b/plugins/server/http/server.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package http
+
+import (
+       "net/http"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/config"
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+)
+
+const Name = "http-server"
+
+type Server struct {
+       config.CommonFields
+       Address string         `mapstructure:"address"`
+       Server  *http.ServeMux // The http server.
+}
+
+func (s *Server) Name() string {
+       return Name
+}
+
+func (s *Server) Description() string {
+       return "this is a http server for receive logs."
+}
+
+func (s *Server) DefaultConfig() string {
+       return `
+# The http server address.
+address: ":8080"
+`
+}
+
+func (s *Server) Prepare() error {
+       s.Server = http.NewServeMux()
+       return nil
+}
+
+func (s *Server) Start() error {
+       go func() {
+               err := http.ListenAndServe(s.Address, s.Server)
+               if err != nil {
+                       log.Logger.Errorf("start http server error: %v", err)
+               }
+       }()
+       return nil
+}
+
+func (s *Server) Close() error {
+       return nil
+}
+
+func (s *Server) GetServer() interface{} {
+       return s
+}

Reply via email to