This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-go.git
The following commit(s) were added to refs/heads/main by this push: new 66fe3fc feat: impl http.Hijacker (#188) 66fe3fc is described below commit 66fe3fcc6357cc78f0eef2be372f2a7a70e9b7a8 Author: Like <likelo...@gmail.com> AuthorDate: Sat Jul 13 14:01:23 2024 +0800 feat: impl http.Hijacker (#188) --- CHANGES.md | 1 + plugins/mux/serve_interceptor.go | 41 ++++++++++++++++++++++-- test/plugins/scenarios/mux/config/excepted.yml | 24 +++++++++++--- test/plugins/scenarios/mux/go.mod | 5 ++- test/plugins/scenarios/mux/go.sum | 2 ++ test/plugins/scenarios/mux/main.go | 44 ++++++++++++++++++++++++++ 6 files changed, 110 insertions(+), 7 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 83685e8..26f08f9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -15,6 +15,7 @@ Release Notes. * Support http headers collection for Gin. * Support higher versions of grpc. * Support [go-elasticsearchv8](https://github.com/elastic/go-elasticsearch) database client framework. +* Support `http.Hijacker` interface for mux plugin. ### Bug Fixes * Fix panic error when root span finished. diff --git a/plugins/mux/serve_interceptor.go b/plugins/mux/serve_interceptor.go index 7078d08..01b2dee 100644 --- a/plugins/mux/serve_interceptor.go +++ b/plugins/mux/serve_interceptor.go @@ -40,8 +40,8 @@ func (n *ServeHTTPInterceptor) BeforeInvoke(invocation operator.Invocation) erro return err } - writer := invocation.Args()[0].(http.ResponseWriter) - invocation.ChangeArg(0, &writerWrapper{ResponseWriter: writer, statusCode: http.StatusOK}) + rw := newResponseWriter(invocation.Args()[0]) + invocation.ChangeArg(0, rw) invocation.SetContext(s) return nil } @@ -54,10 +54,32 @@ func (n *ServeHTTPInterceptor) AfterInvoke(invocation operator.Invocation, resul if wrapped, ok := invocation.Args()[0].(*writerWrapper); ok { span.Tag(tracing.TagStatusCode, fmt.Sprintf("%d", wrapped.statusCode)) } + if wrapped, ok := invocation.Args()[0].(*writerWrapperWithHijacker); ok { + span.Tag(tracing.TagStatusCode, fmt.Sprintf("%d", wrapped.writer.statusCode)) + } span.End() return nil } +func newResponseWriter(val interface{}) http.ResponseWriter { + var rw http.ResponseWriter + sourceWriter := val.(http.ResponseWriter) + switch val.(type) { + case http.Hijacker: + rw = newWriterWrapperWithHijacker(sourceWriter, sourceWriter.(http.Hijacker)) + default: + rw = newWriterWrapper(rw) + } + return rw +} + +func newWriterWrapper(writer http.ResponseWriter) *writerWrapper { + return &writerWrapper{ + ResponseWriter: writer, + statusCode: http.StatusOK, + } +} + type writerWrapper struct { http.ResponseWriter statusCode int @@ -68,3 +90,18 @@ func (w *writerWrapper) WriteHeader(statusCode int) { w.statusCode = statusCode w.ResponseWriter.WriteHeader(statusCode) } + +func newWriterWrapperWithHijacker(writer http.ResponseWriter, hijacker http.Hijacker) *writerWrapperWithHijacker { + wrapper := newWriterWrapper(writer) + return &writerWrapperWithHijacker{ + ResponseWriter: wrapper, + writer: wrapper, + Hijacker: hijacker, + } +} + +type writerWrapperWithHijacker struct { + http.ResponseWriter + writer *writerWrapper // status code cache + http.Hijacker +} diff --git a/test/plugins/scenarios/mux/config/excepted.yml b/test/plugins/scenarios/mux/config/excepted.yml index 43b2361..670692f 100644 --- a/test/plugins/scenarios/mux/config/excepted.yml +++ b/test/plugins/scenarios/mux/config/excepted.yml @@ -16,8 +16,25 @@ segmentItems: - serviceName: mux - segmentSize: ge 3 + segmentSize: ge 4 segments: + - segmentId: not null + spans: + - operationName: GET:/ws + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: gt 0 + endTime: gt 0 + componentId: 5017 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - { key: http.method, value: GET } + - { key: url, value: 'localhost:8080/ws' } + - { key: status_code, value: '200' } - segmentId: not null spans: - operationName: GET:/provider/test @@ -72,6 +89,5 @@ segmentItems: parentSpanId: 1, parentTraceSegmentId: not null, parentServiceInstance: not null, parentService: mux, traceId: not null } - -meterItems: [] -logItems: [] \ No newline at end of file +meterItems: [ ] +logItems: [ ] diff --git a/test/plugins/scenarios/mux/go.mod b/test/plugins/scenarios/mux/go.mod index 7551e44..ad02ad8 100644 --- a/test/plugins/scenarios/mux/go.mod +++ b/test/plugins/scenarios/mux/go.mod @@ -2,4 +2,7 @@ module test/plugins/scenarios/mux go 1.18 -require github.com/gorilla/mux v1.8.0 // indirect +require ( + github.com/gorilla/mux v1.8.0 // indirect + github.com/gorilla/websocket v1.5.3 // indirect +) diff --git a/test/plugins/scenarios/mux/go.sum b/test/plugins/scenarios/mux/go.sum index 5350288..ac0d4be 100644 --- a/test/plugins/scenarios/mux/go.sum +++ b/test/plugins/scenarios/mux/go.sum @@ -1,2 +1,4 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/test/plugins/scenarios/mux/main.go b/test/plugins/scenarios/mux/main.go index 464d463..b7bc30f 100644 --- a/test/plugins/scenarios/mux/main.go +++ b/test/plugins/scenarios/mux/main.go @@ -18,9 +18,11 @@ package main import ( + "github.com/gorilla/websocket" "io" "log" "net/http" + "net/url" "time" _ "github.com/apache/skywalking-go" @@ -30,6 +32,8 @@ import ( func provider(w http.ResponseWriter, r *http.Request) { time.Sleep(time.Millisecond * 10) + // test ws + connectWs() w.Write([]byte("success")) } @@ -54,11 +58,51 @@ func health(w http.ResponseWriter, r *http.Request) { w.Write([]byte("success")) } +var upgrader = websocket.Upgrader{} + +// ws +// ISSUE: https://github.com/apache/skywalking-go/pull/188 +// Test http.ResponseWriter cast to http.Hijacker +func ws(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Print("upgrade:", err) + return + } + defer c.Close() + for { + mt, message, err := c.ReadMessage() + if err != nil { + log.Println("read:", err) + break + } + log.Printf("recv: %s", message) + err = c.WriteMessage(mt, message) + if err != nil { + log.Println("write:", err) + break + } + } +} + +func connectWs() { + u := url.URL{Scheme: "ws", Host: "localhost:8080", Path: "/ws"} + log.Printf("connecting to %s", u.String()) + c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + log.Fatal("dial:", err) + } + defer c.Close() + c.WriteMessage(websocket.TextMessage, []byte("hello from mux test")) + c.Close() +} + func main() { r := mux.NewRouter() r.Path("/health").HandlerFunc(health) r.Path("/consumer").HandlerFunc(consumer) r.PathPrefix("/provider").Path("/{var}").HandlerFunc(provider) + r.Path("/ws").HandlerFunc(ws) // Bind to a port and pass our router in log.Fatal(http.ListenAndServe(":8080", r))