This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/develop by this push:
     new d25b6975 Fix triple proxy problems (#548)
d25b6975 is described below

commit d25b697527f0153920d30091f69d7b5e21cef9f7
Author: shawnh2 <[email protected]>
AuthorDate: Tue Mar 21 10:26:59 2023 +0800

    Fix triple proxy problems (#548)
    
    * remove the dependency of mercari/grpc-http-proxy mod
    
    * handle the errors in client/proxy
    
    * add ability to forward header
    
    * fix linter
    
    * fix linter
    
    * add protoset loading mechanism to solve the incompatibilities within 
k8s.io proto etc
    
    * fix loading logic and lower down the grade of logger
    
    * Change protoset descriptor source to a singleton type
---
 go.mod                                             |   1 -
 .../github.com/mercari/grpc-http-proxy/LICENSE     |  22 ---
 pixiu/pkg/client/dubbo/config.go                   |   2 +
 pixiu/pkg/client/proxy/descriptor_source.go        | 156 +++++++++++++++++++++
 pixiu/pkg/client/proxy/proxy.go                    |  90 ++++++++++++
 pixiu/pkg/client/proxy/reflection.go               |  86 ++++++++++++
 pixiu/pkg/client/triple/triple.go                  |  57 +++++---
 pixiu/pkg/filter/http/remote/call.go               |  12 +-
 8 files changed, 374 insertions(+), 52 deletions(-)

diff --git a/go.mod b/go.mod
index a59953ea..e1a3f0aa 100644
--- a/go.mod
+++ b/go.mod
@@ -69,7 +69,6 @@ require (
        github.com/lestrrat-go/jwx v1.2.23
        github.com/lucas-clemente/quic-go v0.27.0
        github.com/mattn/go-isatty v0.0.14
-       github.com/mercari/grpc-http-proxy v0.1.2
        github.com/miekg/dns v1.1.48
        github.com/mitchellh/copystructure v1.2.0
        github.com/mitchellh/go-homedir v1.1.0
diff --git a/licenses/github.com/mercari/grpc-http-proxy/LICENSE 
b/licenses/github.com/mercari/grpc-http-proxy/LICENSE
deleted file mode 100644
index 1b5e3f22..00000000
--- a/licenses/github.com/mercari/grpc-http-proxy/LICENSE
+++ /dev/null
@@ -1,22 +0,0 @@
-Copyright (c) 2018 Mercari, Inc.
-
-MIT License
-
-Permission is hereby granted, free of charge, to any person obtaining
-a copy of this software and associated documentation files (the
-"Software"), to deal in the Software without restriction, including
-without limitation the rights to use, copy, modify, merge, publish,
-distribute, sublicense, and/or sell copies of the Software, and to
-permit persons to whom the Software is furnished to do so, subject to
-the following conditions:
-
-The above copyright notice and this permission notice shall be
-included in all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
-EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
-NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
-LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
-OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
-WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\ No newline at end of file
diff --git a/pixiu/pkg/client/dubbo/config.go b/pixiu/pkg/client/dubbo/config.go
index dbc752fb..85db5892 100644
--- a/pixiu/pkg/client/dubbo/config.go
+++ b/pixiu/pkg/client/dubbo/config.go
@@ -31,4 +31,6 @@ type DubboProxyConfig struct {
        IsDefaultMap bool
        // AutoResolve whether to resolve api config from request
        AutoResolve bool `yaml:"auto_resolve" json:"auto_resolve,omitempty"`
+       // Protoset path to load protoset files
+       Protoset []string `yaml:"protoset" json:"protoset,omitempty"`
 }
diff --git a/pixiu/pkg/client/proxy/descriptor_source.go 
b/pixiu/pkg/client/proxy/descriptor_source.go
new file mode 100644
index 00000000..3240ef02
--- /dev/null
+++ b/pixiu/pkg/client/proxy/descriptor_source.go
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file contains code that is copied and modified from:
+// https://github.com/fullstorydev/grpcurl/blob/v1.8.7/desc_source.go
+
+package proxy
+
+import (
+       "os"
+       "sync"
+)
+
+import (
+       "github.com/jhump/protoreflect/desc"
+       "github.com/jhump/protoreflect/dynamic"
+       "github.com/pkg/errors"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/descriptorpb"
+)
+
+import (
+       "github.com/apache/dubbo-go-pixiu/pixiu/pkg/filter/http/grpcproxy"
+       "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
+)
+
+var (
+       sourceOnce     sync.Once
+       protosetSource grpcproxy.DescriptorSource
+)
+
+func InitProtosetSource(protoset []string) {
+       sourceOnce.Do(func() {
+               var err error
+               protosetSource, err = DescriptorSourceFromProtoset(protoset)
+               if err != nil {
+                       logger.Infof("[dubbo-go-pixiu] could not load protoset 
files: %v", err)
+               }
+       })
+}
+
+// DescriptorSourceFromProtoset creates a DescriptorSource that is backed by 
the named files,
+// whose contents are Protocol Buffer source files. The given importPaths are 
used to locate
+// any imported files.
+func DescriptorSourceFromProtoset(filenames []string) 
(grpcproxy.DescriptorSource, error) {
+       if len(filenames) < 1 {
+               return nil, errors.New("no protoset files provided")
+       }
+       files := &descriptorpb.FileDescriptorSet{}
+       for _, filename := range filenames {
+               b, err := os.ReadFile(filename)
+               if err != nil {
+                       return nil, errors.Errorf("wrong path to load protoset 
file %q: %v", filename, err)
+               }
+               var fs descriptorpb.FileDescriptorSet
+               err = proto.Unmarshal(b, &fs)
+               if err != nil {
+                       return nil, errors.Errorf("could not parse contents of 
protoset file %q: %v", filename, err)
+               }
+               files.File = append(files.File, fs.File...)
+       }
+       return DescriptorSourceFromFileDescriptorSet(files)
+}
+
+// DescriptorSourceFromFileDescriptorSet creates a DescriptorSource that is 
backed by the FileDescriptorSet.
+func DescriptorSourceFromFileDescriptorSet(files 
*descriptorpb.FileDescriptorSet) (grpcproxy.DescriptorSource, error) {
+       unresolved := map[string]*descriptorpb.FileDescriptorProto{}
+       for _, fd := range files.File {
+               unresolved[fd.GetName()] = fd
+       }
+       resolved := map[string]*desc.FileDescriptor{}
+       for _, fd := range files.File {
+               _, err := resolveFileDescriptor(unresolved, resolved, 
fd.GetName())
+               if err != nil {
+                       return nil, err
+               }
+       }
+       return &protosetFileSource{files: resolved}, nil
+}
+
+func resolveFileDescriptor(unresolved 
map[string]*descriptorpb.FileDescriptorProto, resolved 
map[string]*desc.FileDescriptor, filename string) (*desc.FileDescriptor, error) 
{
+       if r, ok := resolved[filename]; ok {
+               return r, nil
+       }
+       fd, ok := unresolved[filename]
+       if !ok {
+               return nil, errors.Errorf("no descriptor found for %q", 
filename)
+       }
+       deps := make([]*desc.FileDescriptor, 0, len(fd.GetDependency()))
+       for _, dep := range fd.GetDependency() {
+               depFd, err := resolveFileDescriptor(unresolved, resolved, dep)
+               if err != nil {
+                       return nil, err
+               }
+               deps = append(deps, depFd)
+       }
+       result, err := desc.CreateFileDescriptor(fd, deps...)
+       if err != nil {
+               return nil, err
+       }
+       resolved[filename] = result
+       return result, nil
+}
+
+type protosetFileSource struct {
+       files  map[string]*desc.FileDescriptor
+       er     *dynamic.ExtensionRegistry
+       erInit sync.Once
+}
+
+func (fs *protosetFileSource) ListServices() ([]string, error) {
+       set := map[string]bool{}
+       for _, fd := range fs.files {
+               for _, svc := range fd.GetServices() {
+                       set[svc.GetFullyQualifiedName()] = true
+               }
+       }
+       sl := make([]string, 0, len(set))
+       for svc := range set {
+               sl = append(sl, svc)
+       }
+       return sl, nil
+}
+
+func (fs *protosetFileSource) FindSymbol(fullyQualifiedName string) 
(desc.Descriptor, error) {
+       for _, fd := range fs.files {
+               if dsc := fd.FindSymbol(fullyQualifiedName); dsc != nil {
+                       return dsc, nil
+               }
+       }
+       return nil, errors.Errorf("Symbol not found: %s", fullyQualifiedName)
+}
+
+func (fs *protosetFileSource) AllExtensionsForType(typeName string) 
([]*desc.FieldDescriptor, error) {
+       fs.erInit.Do(func() {
+               fs.er = &dynamic.ExtensionRegistry{}
+               for _, fd := range fs.files {
+                       fs.er.AddExtensionsFromFile(fd)
+               }
+       })
+       return fs.er.AllExtensionsForType(typeName), nil
+}
diff --git a/pixiu/pkg/client/proxy/proxy.go b/pixiu/pkg/client/proxy/proxy.go
new file mode 100644
index 00000000..6b7f45b7
--- /dev/null
+++ b/pixiu/pkg/client/proxy/proxy.go
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+       "context"
+       "net/url"
+)
+
+import (
+       "github.com/jhump/protoreflect/dynamic"
+       "github.com/jhump/protoreflect/dynamic/grpcdynamic"
+       "github.com/jhump/protoreflect/grpcreflect"
+       "github.com/pkg/errors"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/grpc/metadata"
+       rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
+       "google.golang.org/grpc/status"
+)
+
+type Proxy struct {
+       cc        *grpc.ClientConn
+       reflector *Reflector
+       stub      grpcdynamic.Stub
+}
+
+// NewProxy creates a new client
+func NewProxy() *Proxy {
+       return &Proxy{}
+}
+
+// Connect opens a connection to target.
+func (p *Proxy) Connect(ctx context.Context, target *url.URL) error {
+       cc, err := grpc.DialContext(ctx, target.String(), 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+       if err != nil {
+               return err
+       }
+       p.cc = cc
+       rc := grpcreflect.NewClient(ctx, rpb.NewServerReflectionClient(p.cc))
+       p.reflector = NewReflector(rc)
+       p.stub = grpcdynamic.NewStub(p.cc)
+       return err
+}
+
+// Call performs the gRPC call after doing reflection to obtain type 
information.
+func (p *Proxy) Call(ctx context.Context, serviceName, methodName string, 
message []byte, md *metadata.MD) ([]byte, error) {
+       invocation, err := p.reflector.CreateInvocation(ctx, serviceName, 
methodName, message)
+       if err != nil {
+               return nil, err
+       }
+
+       output, err := p.stub.InvokeRpc(ctx, invocation.MethodDescriptor, 
invocation.Message, grpc.Header(md))
+       if err != nil {
+               stat := status.Convert(err)
+               if stat.Code() == codes.Unavailable {
+                       return nil, errors.Wrap(err, "could not connect to 
backend")
+               }
+
+               return nil, errors.Wrap(err, stat.Message())
+       }
+
+       outputMessage := 
dynamic.NewMessage(invocation.MethodDescriptor.GetOutputType())
+       err = outputMessage.ConvertFrom(output)
+       if err != nil {
+               return nil, errors.Wrap(err, "response from backend could not 
be converted internally")
+       }
+
+       m, err := outputMessage.MarshalJSON()
+       if err != nil {
+               return nil, err
+       }
+       return m, err
+}
diff --git a/pixiu/pkg/client/proxy/reflection.go 
b/pixiu/pkg/client/proxy/reflection.go
new file mode 100644
index 00000000..4817e187
--- /dev/null
+++ b/pixiu/pkg/client/proxy/reflection.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+       "context"
+)
+
+import (
+       "github.com/jhump/protoreflect/desc"
+       "github.com/jhump/protoreflect/dynamic"
+       "github.com/jhump/protoreflect/grpcreflect"
+       "github.com/pkg/errors"
+)
+
+type Reflector struct {
+       c *grpcreflect.Client
+}
+
+// NewReflector creates a new Reflector from the reflection client
+func NewReflector(client *grpcreflect.Client) *Reflector {
+       return &Reflector{client}
+}
+
+// CreateInvocation creates a MethodInvocation by performing reflection
+func (r *Reflector) CreateInvocation(ctx context.Context, serviceName, 
methodName string, input []byte) (*MethodInvocation, error) {
+       var (
+               serviceDesc *desc.ServiceDescriptor
+               err         error
+       )
+
+       if protosetSource != nil {
+               // Parsing reflection through protoset files
+               var dsc desc.Descriptor
+               dsc, err = protosetSource.FindSymbol(serviceName)
+               if err != nil {
+                       return nil, errors.Errorf("service was not found 
upstream even though it should have been there: %v", err)
+               }
+
+               var ok bool
+               serviceDesc, ok = dsc.(*desc.ServiceDescriptor)
+               if !ok {
+                       return nil, errors.Errorf("target server does not 
expose service %s", serviceName)
+               }
+       } else {
+               serviceDesc, err = r.c.ResolveService(serviceName)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       methodDesc := serviceDesc.FindMethodByName(methodName)
+       if methodDesc == nil {
+               return nil, errors.New("method not found upstream")
+       }
+       inputMessage := dynamic.NewMessage(methodDesc.GetInputType())
+       err = inputMessage.UnmarshalJSON(input)
+       if err != nil {
+               return nil, err
+       }
+       return &MethodInvocation{
+               MethodDescriptor: methodDesc,
+               Message:          inputMessage,
+       }, nil
+}
+
+// MethodInvocation contains a method and a message used to invoke an RPC
+type MethodInvocation struct {
+       *desc.MethodDescriptor
+       *dynamic.Message
+}
diff --git a/pixiu/pkg/client/triple/triple.go 
b/pixiu/pkg/client/triple/triple.go
index 5ee2dd55..58578f9f 100644
--- a/pixiu/pkg/client/triple/triple.go
+++ b/pixiu/pkg/client/triple/triple.go
@@ -20,26 +20,25 @@ package triple
 import (
        "context"
        "io"
+       "net/http"
        "net/url"
        "strings"
        "sync"
 )
 
 import (
-       gerrors "github.com/mercari/grpc-http-proxy/errors"
-       proxymeta "github.com/mercari/grpc-http-proxy/metadata"
-       "github.com/mercari/grpc-http-proxy/proxy"
        "github.com/pkg/errors"
-       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/metadata"
 )
 
 import (
        "github.com/apache/dubbo-go-pixiu/pixiu/pkg/client"
+       "github.com/apache/dubbo-go-pixiu/pixiu/pkg/client/proxy"
 )
 
 // InitDefaultTripleClient init default dubbo client
-func InitDefaultTripleClient() {
-       tripleClient = NewTripleClient()
+func InitDefaultTripleClient(protoset []string) {
+       tripleClient = NewTripleClient(protoset)
 }
 
 var (
@@ -48,9 +47,10 @@ var (
 )
 
 // NewTripleClient create dubbo client
-func NewTripleClient() *Client {
+func NewTripleClient(protoset []string) *Client {
        clientOnce.Do(func() {
                tripleClient = &Client{}
+               proxy.InitProtosetSource(protoset)
        })
        return tripleClient
 }
@@ -68,40 +68,51 @@ func (tc *Client) MapParams(req *client.Request) (reqData 
interface{}, err error
 }
 
 // Close clear GenericServicePool.
-func (dc *Client) Close() error {
+func (tc *Client) Close() error {
        return nil
 }
 
 // SingletonTripleClient singleton dubbo clent
-func SingletonTripleClient() *Client {
-       return NewTripleClient()
+func SingletonTripleClient(protoset []string) *Client {
+       return NewTripleClient(protoset)
 }
 
 // Call invoke service
-func (dc *Client) Call(req *client.Request) (res interface{}, err error) {
+func (tc *Client) Call(req *client.Request) (res interface{}, err error) {
        address := 
strings.Split(req.API.IntegrationRequest.HTTPBackendConfig.URL, ":")
        p := proxy.NewProxy()
        targetURL := &url.URL{
                Scheme: address[0],
                Opaque: address[1],
        }
-       if err := p.Connect(context.Background(), targetURL); err != nil {
-               return "", errors.Errorf("connect triple server error = %s", 
err)
+       if err = p.Connect(context.Background(), targetURL); err != nil {
+               return nil, errors.Errorf("connect triple server error = %s", 
err)
        }
+
+       header := tc.forwardHeaders(req.IngressRequest.Header)
+       ctx := metadata.NewOutgoingContext(context.Background(), header)
        meta := make(map[string][]string)
-       reqData, _ := io.ReadAll(req.IngressRequest.Body)
-       ctx, cancel := context.WithTimeout(context.Background(), req.Timeout)
+       reqData, err := io.ReadAll(req.IngressRequest.Body)
+       if err != nil {
+               return nil, errors.Errorf("read request body error = %s", err)
+       }
+
+       ctx, cancel := context.WithTimeout(ctx, req.Timeout)
        defer cancel()
-       call, err := p.Call(ctx, req.API.Method.IntegrationRequest.Interface, 
req.API.Method.IntegrationRequest.Method, reqData, (*proxymeta.Metadata)(&meta))
+       call, err := p.Call(ctx, req.API.Method.IntegrationRequest.Interface, 
req.API.Method.IntegrationRequest.Method, reqData, (*metadata.MD)(&meta))
        if err != nil {
-               gerr, ok := err.(*gerrors.GRPCError)
-               if ok {
-                       statusCode := codes.Code(gerr.StatusCode)
-                       if statusCode == codes.Canceled || statusCode == 
codes.DeadlineExceeded {
-                               return "", errors.Errorf("call triple server 
timeout error = %s", err)
-                       }
-               }
                return "", errors.Errorf("call triple server error = %s", err)
        }
        return call, nil
 }
+
+// forwardHeaders specific what headers should be forwarded
+func (tc *Client) forwardHeaders(header http.Header) metadata.MD {
+       md := metadata.MD{}
+       for k, vals := range header {
+               if s := strings.ToLower(k); strings.HasPrefix(s, "tri-") {
+                       md.Set(k, vals...)
+               }
+       }
+       return md
+}
diff --git a/pixiu/pkg/filter/http/remote/call.go 
b/pixiu/pkg/filter/http/remote/call.go
index 2ba692ad..f6583539 100644
--- a/pixiu/pkg/filter/http/remote/call.go
+++ b/pixiu/pkg/filter/http/remote/call.go
@@ -104,7 +104,7 @@ func (factory *FilterFactory) Apply() error {
        factory.conf.Level = level
        // must init it at apply function
        dubbo.InitDefaultDubboClient(factory.conf.Dpc)
-       triple.InitDefaultTripleClient()
+       triple.InitDefaultTripleClient(factory.conf.Dpc.Protoset)
        return nil
 }
 
@@ -133,7 +133,7 @@ func (f *Filter) Decode(c *contexthttp.HttpContext) 
filter.FilterStatus {
 
        typ := api.Method.IntegrationRequest.RequestType
 
-       cli, err := matchClient(typ)
+       cli, err := f.matchClient(typ)
        if err != nil {
                panic(err)
        }
@@ -142,7 +142,7 @@ func (f *Filter) Decode(c *contexthttp.HttpContext) 
filter.FilterStatus {
        req.Timeout = c.Timeout
        resp, err := cli.Call(req)
        if err != nil {
-               logger.Errorf("[dubbo-go-pixiu] client call err:%v!", err)
+               logger.Errorf("[dubbo-go-pixiu] client call err: %v!", err)
                if strings.Contains(strings.ToLower(err.Error()), "timeout") {
                        c.SendLocalReply(http.StatusGatewayTimeout, 
[]byte(fmt.Sprintf("client call timeout err: %s", err)))
                        return filter.Stop
@@ -151,19 +151,19 @@ func (f *Filter) Decode(c *contexthttp.HttpContext) 
filter.FilterStatus {
                return filter.Stop
        }
 
-       logger.Debugf("[dubbo-go-pixiu] client call resp:%v", resp)
+       logger.Debugf("[dubbo-go-pixiu] client call resp: %v", resp)
 
        c.SourceResp = resp
        return filter.Continue
 }
 
-func matchClient(typ apiConf.RequestType) (client.Client, error) {
+func (f *Filter) matchClient(typ apiConf.RequestType) (client.Client, error) {
        switch strings.ToLower(string(typ)) {
        case string(apiConf.DubboRequest):
                return dubbo.SingletonDubboClient(), nil
        // todo @(laurence) add triple to apiConf
        case "triple":
-               return triple.SingletonTripleClient(), nil
+               return triple.SingletonTripleClient(f.conf.Dpc.Protoset), nil
        case string(apiConf.HTTPRequest):
                return clienthttp.SingletonHTTPClient(), nil
        default:

Reply via email to