This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push: new 18f556c feat: add http logging receiver (#16) 18f556c is described below commit 18f556c852800fa722ce17e3073cf328bdccfa38 Author: kv <gxt...@163.com> AuthorDate: Thu Jan 21 08:35:43 2021 +0800 feat: add http logging receiver (#16) --- go.sum | 2 + plugins/queue/mmap/queue.go | 1 - plugins/receiver/http/receiver.go | 121 +++++++++++++++++++++ plugins/receiver/http/receiver_test.go | 185 +++++++++++++++++++++++++++++++++ plugins/server/http/server.go | 71 +++++++++++++ 5 files changed, 379 insertions(+), 1 deletion(-) diff --git a/go.sum b/go.sum index b4f00e9..9826328 100644 --- a/go.sum +++ b/go.sum @@ -202,6 +202,7 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg= +github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= @@ -481,6 +482,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go index 22b76b0..46aecd6 100644 --- a/plugins/queue/mmap/queue.go +++ b/plugins/queue/mmap/queue.go @@ -28,7 +28,6 @@ import ( "time" "github.com/grandecola/mmap" - "google.golang.org/protobuf/proto" "github.com/apache/skywalking-satellite/internal/pkg/config" diff --git a/plugins/receiver/http/receiver.go b/plugins/receiver/http/receiver.go new file mode 100644 index 0000000..20fbc22 --- /dev/null +++ b/plugins/receiver/http/receiver.go @@ -0,0 +1,121 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package http + +import ( + "fmt" + "io/ioutil" + "net/http" + "time" + + logging "skywalking/network/logging/v3" + + "google.golang.org/protobuf/proto" + + "encoding/json" + + "github.com/apache/skywalking-satellite/internal/pkg/config" + "github.com/apache/skywalking-satellite/internal/pkg/log" + http_server "github.com/apache/skywalking-satellite/plugins/server/http" + "github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol" +) + +const ( + Name = "http-log-receiver" + eventName = "http-log-event" + timeout = 5 * time.Second + Success = "success" + Failing = "failing" +) + +type Receiver struct { + config.CommonFields + Server *http_server.Server + OutputChannel chan *protocol.Event + URI string `mapstructure:"uri"` +} + +type Response struct { + Status string `json:"status"` + Msg string `json:"msg"` +} + +func (r *Receiver) Name() string { + return Name +} + +func (r *Receiver) Description() string { + return "This is a receiver for SkyWalking http logging format, " + + "which is defined at https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto." +} + +func (r *Receiver) DefaultConfig() string { + return ` +# The http server uri . +uri: "/logging" +` +} + +func (r *Receiver) RegisterHandler(server interface{}) { + r.Server = server.(*http_server.Server) + r.OutputChannel = make(chan *protocol.Event) + r.Server.Server.Handle(r.URI, httpHandler(r)) +} + +func ResponseWithJSON(rsp http.ResponseWriter, response *Response, code int) { + rsp.WriteHeader(code) + _ = json.NewEncoder(rsp).Encode(response) +} + +func httpHandler(r *Receiver) http.Handler { + h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) { + rsp.Header().Set("Content-Type", "application/json") + b, err := ioutil.ReadAll(req.Body) + if err != nil { + log.Logger.Errorf("get http body error: %v", err) + response := &Response{Status: Failing, Msg: err.Error()} + ResponseWithJSON(rsp, response, http.StatusBadRequest) + return + } + var data logging.LogData + err = proto.Unmarshal(b, &data) + if err != nil { + response := &Response{Status: Failing, Msg: err.Error()} + ResponseWithJSON(rsp, response, http.StatusInternalServerError) + return + } + e := &protocol.Event{ + Name: eventName, + Timestamp: time.Now().UnixNano() / 1e6, + Meta: nil, + Type: protocol.EventType_Logging, + Remote: true, + Data: &protocol.Event_Log{ + Log: &data, + }, + } + r.OutputChannel <- e + response := &Response{Status: Success, Msg: Success} + ResponseWithJSON(rsp, response, http.StatusOK) + }) + return http.TimeoutHandler(h, timeout, fmt.Sprintf("Exceeded configured timeout of %v \n", timeout)) +} + +func (r *Receiver) Channel() <-chan *protocol.Event { + return r.OutputChannel +} diff --git a/plugins/receiver/http/receiver_test.go b/plugins/receiver/http/receiver_test.go new file mode 100644 index 0000000..a2ece41 --- /dev/null +++ b/plugins/receiver/http/receiver_test.go @@ -0,0 +1,185 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package http + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "reflect" + "strconv" + "testing" + "time" + + common "skywalking/network/common/v3" + logging "skywalking/network/logging/v3" + + "github.com/google/go-cmp/cmp" + "google.golang.org/protobuf/proto" + + "encoding/json" + + "github.com/apache/skywalking-satellite/internal/pkg/plugin" + _ "github.com/apache/skywalking-satellite/internal/satellite/test" + receiver "github.com/apache/skywalking-satellite/plugins/receiver/api" + server "github.com/apache/skywalking-satellite/plugins/server/api" + httpserver "github.com/apache/skywalking-satellite/plugins/server/http" + "github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol" +) + +func TestReceiver_http_RegisterHandler(t *testing.T) { + Init() + r := initReceiver(make(plugin.Config), t) + s := initServer(make(plugin.Config), t) + r.RegisterHandler(s.GetServer()) + err := s.Start() + if err != nil { + t.Fatalf(err.Error()) + } + time.Sleep(time.Second) + defer func() { + if err := s.Close(); err != nil { + t.Fatalf("cannot close the http sever: %v", err) + } + }() + for i := 0; i < 10; i++ { + data := initData(i) + dataBytes, err := proto.Marshal(data) + client := http.Client{Timeout: 5 * time.Second} + if err != nil { + t.Fatalf("cannot marshal the data: %v", err) + } + go func() { + resp, err := client.Post("http://localhost:8080/logging", "application/json", bytes.NewBuffer(dataBytes)) + if err != nil { + fmt.Printf("cannot request the http-server , error: %v", err) + } + defer resp.Body.Close() + result, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Printf("cannot get response from request, error: %v ", err.Error()) + } + var response Response + _ = json.Unmarshal(result, &response) + if !cmp.Equal(response.Status, Success) { + panic("the response should be success, but failing") + } + }() + + newData := <-r.Channel() + if !cmp.Equal(newData.Data.(*protocol.Event_Log).Log.String(), data.String()) { + t.Fatalf("the sent data is not equal to the received data\n, "+ + "want data %s\n, but got %s\n", data.String(), newData.String()) + } + } +} + +func TestReceiver_http_RegisterHandler_failed(t *testing.T) { + Init() + r := initReceiver(make(plugin.Config), t) + s := initServer(make(plugin.Config), t) + r.RegisterHandler(s.GetServer()) + err := s.Start() + if err != nil { + t.Fatalf(err.Error()) + } + time.Sleep(time.Second) + defer func() { + if err = s.Close(); err != nil { + t.Fatalf("cannot close the http sever: %v", err) + } + }() + data := initData(0) + dataBytes, err := json.Marshal(data) + client := http.Client{Timeout: 5 * time.Second} + if err != nil { + t.Fatalf("cannot marshal the data: %v", err) + } + resp, err := client.Post("http://localhost:8080/logging", "application/json", bytes.NewBuffer(dataBytes)) + if err != nil { + fmt.Printf("cannot request the http-server , error: %v", err) + } + defer resp.Body.Close() + result, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Printf("cannot get response from request, error: %v ", err.Error()) + } + var response Response + _ = json.Unmarshal(result, &response) + if !cmp.Equal(response.Status, Failing) { + panic("the response should be failing, but success") + } +} + +func initData(sequence int) *logging.LogData { + seq := strconv.Itoa(sequence) + return &logging.LogData{ + Timestamp: time.Now().Unix(), + Service: "demo-service" + seq, + ServiceInstance: "demo-instance" + seq, + Endpoint: "demo-endpoint" + seq, + TraceContext: &logging.TraceContext{ + TraceSegmentId: "mock-segmentId" + seq, + TraceId: "mock-traceId" + seq, + SpanId: 1, + }, + Tags: []*common.KeyStringValuePair{ + { + Key: "mock-key" + seq, + Value: "mock-value" + seq, + }, + }, + Body: &logging.LogDataBody{ + Type: "mock-type" + seq, + Content: &logging.LogDataBody_Text{ + Text: &logging.TextLog{ + Text: "this is a mock text mock log" + seq, + }, + }, + }, + } +} + +func Init() { + plugin.RegisterPluginCategory(reflect.TypeOf((*server.Server)(nil)).Elem()) + plugin.RegisterPluginCategory(reflect.TypeOf((*receiver.Receiver)(nil)).Elem()) + plugin.RegisterPlugin(new(httpserver.Server)) + plugin.RegisterPlugin(new(Receiver)) +} + +func initServer(cfg plugin.Config, t *testing.T) server.Server { + cfg[plugin.NameField] = httpserver.Name + q := server.GetServer(cfg) + if q == nil { + t.Fatalf("cannot get a http server from the registry") + } + if err := q.Prepare(); err != nil { + t.Fatalf("cannot perpare the http server: %v", err) + } + return q +} + +func initReceiver(cfg plugin.Config, t *testing.T) receiver.Receiver { + cfg[plugin.NameField] = "http-log-receiver" + q := receiver.GetReceiver(cfg) + if q == nil { + t.Fatalf("cannot get http-log-receiver from the registry") + } + return q +} diff --git a/plugins/server/http/server.go b/plugins/server/http/server.go new file mode 100644 index 0000000..7d93382 --- /dev/null +++ b/plugins/server/http/server.go @@ -0,0 +1,71 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package http + +import ( + "net/http" + + "github.com/apache/skywalking-satellite/internal/pkg/config" + "github.com/apache/skywalking-satellite/internal/pkg/log" +) + +const Name = "http-server" + +type Server struct { + config.CommonFields + Address string `mapstructure:"address"` + Server *http.ServeMux // The http server. +} + +func (s *Server) Name() string { + return Name +} + +func (s *Server) Description() string { + return "this is a http server for receive logs." +} + +func (s *Server) DefaultConfig() string { + return ` +# The http server address. +address: ":8080" +` +} + +func (s *Server) Prepare() error { + s.Server = http.NewServeMux() + return nil +} + +func (s *Server) Start() error { + go func() { + err := http.ListenAndServe(s.Address, s.Server) + if err != nil { + log.Logger.Errorf("start http server error: %v", err) + } + }() + return nil +} + +func (s *Server) Close() error { + return nil +} + +func (s *Server) GetServer() interface{} { + return s +}