This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new d25b6975 Fix triple proxy problems (#548)
d25b6975 is described below
commit d25b697527f0153920d30091f69d7b5e21cef9f7
Author: shawnh2 <[email protected]>
AuthorDate: Tue Mar 21 10:26:59 2023 +0800
Fix triple proxy problems (#548)
* remove the dependency of mercari/grpc-http-proxy mod
* handle the errors in client/proxy
* add ability to forward header
* fix linter
* fix linter
* add protoset loading mechanism to solve the incompatibilities within
k8s.io proto etc
* fix loading logic and lower down the grade of logger
* Change protoset descriptor source to a singleton type
---
go.mod | 1 -
.../github.com/mercari/grpc-http-proxy/LICENSE | 22 ---
pixiu/pkg/client/dubbo/config.go | 2 +
pixiu/pkg/client/proxy/descriptor_source.go | 156 +++++++++++++++++++++
pixiu/pkg/client/proxy/proxy.go | 90 ++++++++++++
pixiu/pkg/client/proxy/reflection.go | 86 ++++++++++++
pixiu/pkg/client/triple/triple.go | 57 +++++---
pixiu/pkg/filter/http/remote/call.go | 12 +-
8 files changed, 374 insertions(+), 52 deletions(-)
diff --git a/go.mod b/go.mod
index a59953ea..e1a3f0aa 100644
--- a/go.mod
+++ b/go.mod
@@ -69,7 +69,6 @@ require (
github.com/lestrrat-go/jwx v1.2.23
github.com/lucas-clemente/quic-go v0.27.0
github.com/mattn/go-isatty v0.0.14
- github.com/mercari/grpc-http-proxy v0.1.2
github.com/miekg/dns v1.1.48
github.com/mitchellh/copystructure v1.2.0
github.com/mitchellh/go-homedir v1.1.0
diff --git a/licenses/github.com/mercari/grpc-http-proxy/LICENSE
b/licenses/github.com/mercari/grpc-http-proxy/LICENSE
deleted file mode 100644
index 1b5e3f22..00000000
--- a/licenses/github.com/mercari/grpc-http-proxy/LICENSE
+++ /dev/null
@@ -1,22 +0,0 @@
-Copyright (c) 2018 Mercari, Inc.
-
-MIT License
-
-Permission is hereby granted, free of charge, to any person obtaining
-a copy of this software and associated documentation files (the
-"Software"), to deal in the Software without restriction, including
-without limitation the rights to use, copy, modify, merge, publish,
-distribute, sublicense, and/or sell copies of the Software, and to
-permit persons to whom the Software is furnished to do so, subject to
-the following conditions:
-
-The above copyright notice and this permission notice shall be
-included in all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
-EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
-NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
-LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
-OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
-WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\ No newline at end of file
diff --git a/pixiu/pkg/client/dubbo/config.go b/pixiu/pkg/client/dubbo/config.go
index dbc752fb..85db5892 100644
--- a/pixiu/pkg/client/dubbo/config.go
+++ b/pixiu/pkg/client/dubbo/config.go
@@ -31,4 +31,6 @@ type DubboProxyConfig struct {
IsDefaultMap bool
// AutoResolve whether to resolve api config from request
AutoResolve bool `yaml:"auto_resolve" json:"auto_resolve,omitempty"`
+ // Protoset path to load protoset files
+ Protoset []string `yaml:"protoset" json:"protoset,omitempty"`
}
diff --git a/pixiu/pkg/client/proxy/descriptor_source.go
b/pixiu/pkg/client/proxy/descriptor_source.go
new file mode 100644
index 00000000..3240ef02
--- /dev/null
+++ b/pixiu/pkg/client/proxy/descriptor_source.go
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file contains code that is copied and modified from:
+// https://github.com/fullstorydev/grpcurl/blob/v1.8.7/desc_source.go
+
+package proxy
+
+import (
+ "os"
+ "sync"
+)
+
+import (
+ "github.com/jhump/protoreflect/desc"
+ "github.com/jhump/protoreflect/dynamic"
+ "github.com/pkg/errors"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/descriptorpb"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/filter/http/grpcproxy"
+ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
+)
+
+var (
+ sourceOnce sync.Once
+ protosetSource grpcproxy.DescriptorSource
+)
+
+func InitProtosetSource(protoset []string) {
+ sourceOnce.Do(func() {
+ var err error
+ protosetSource, err = DescriptorSourceFromProtoset(protoset)
+ if err != nil {
+ logger.Infof("[dubbo-go-pixiu] could not load protoset
files: %v", err)
+ }
+ })
+}
+
+// DescriptorSourceFromProtoset creates a DescriptorSource that is backed by
the named files,
+// whose contents are Protocol Buffer source files. The given importPaths are
used to locate
+// any imported files.
+func DescriptorSourceFromProtoset(filenames []string)
(grpcproxy.DescriptorSource, error) {
+ if len(filenames) < 1 {
+ return nil, errors.New("no protoset files provided")
+ }
+ files := &descriptorpb.FileDescriptorSet{}
+ for _, filename := range filenames {
+ b, err := os.ReadFile(filename)
+ if err != nil {
+ return nil, errors.Errorf("wrong path to load protoset
file %q: %v", filename, err)
+ }
+ var fs descriptorpb.FileDescriptorSet
+ err = proto.Unmarshal(b, &fs)
+ if err != nil {
+ return nil, errors.Errorf("could not parse contents of
protoset file %q: %v", filename, err)
+ }
+ files.File = append(files.File, fs.File...)
+ }
+ return DescriptorSourceFromFileDescriptorSet(files)
+}
+
+// DescriptorSourceFromFileDescriptorSet creates a DescriptorSource that is
backed by the FileDescriptorSet.
+func DescriptorSourceFromFileDescriptorSet(files
*descriptorpb.FileDescriptorSet) (grpcproxy.DescriptorSource, error) {
+ unresolved := map[string]*descriptorpb.FileDescriptorProto{}
+ for _, fd := range files.File {
+ unresolved[fd.GetName()] = fd
+ }
+ resolved := map[string]*desc.FileDescriptor{}
+ for _, fd := range files.File {
+ _, err := resolveFileDescriptor(unresolved, resolved,
fd.GetName())
+ if err != nil {
+ return nil, err
+ }
+ }
+ return &protosetFileSource{files: resolved}, nil
+}
+
+func resolveFileDescriptor(unresolved
map[string]*descriptorpb.FileDescriptorProto, resolved
map[string]*desc.FileDescriptor, filename string) (*desc.FileDescriptor, error)
{
+ if r, ok := resolved[filename]; ok {
+ return r, nil
+ }
+ fd, ok := unresolved[filename]
+ if !ok {
+ return nil, errors.Errorf("no descriptor found for %q",
filename)
+ }
+ deps := make([]*desc.FileDescriptor, 0, len(fd.GetDependency()))
+ for _, dep := range fd.GetDependency() {
+ depFd, err := resolveFileDescriptor(unresolved, resolved, dep)
+ if err != nil {
+ return nil, err
+ }
+ deps = append(deps, depFd)
+ }
+ result, err := desc.CreateFileDescriptor(fd, deps...)
+ if err != nil {
+ return nil, err
+ }
+ resolved[filename] = result
+ return result, nil
+}
+
+type protosetFileSource struct {
+ files map[string]*desc.FileDescriptor
+ er *dynamic.ExtensionRegistry
+ erInit sync.Once
+}
+
+func (fs *protosetFileSource) ListServices() ([]string, error) {
+ set := map[string]bool{}
+ for _, fd := range fs.files {
+ for _, svc := range fd.GetServices() {
+ set[svc.GetFullyQualifiedName()] = true
+ }
+ }
+ sl := make([]string, 0, len(set))
+ for svc := range set {
+ sl = append(sl, svc)
+ }
+ return sl, nil
+}
+
+func (fs *protosetFileSource) FindSymbol(fullyQualifiedName string)
(desc.Descriptor, error) {
+ for _, fd := range fs.files {
+ if dsc := fd.FindSymbol(fullyQualifiedName); dsc != nil {
+ return dsc, nil
+ }
+ }
+ return nil, errors.Errorf("Symbol not found: %s", fullyQualifiedName)
+}
+
+func (fs *protosetFileSource) AllExtensionsForType(typeName string)
([]*desc.FieldDescriptor, error) {
+ fs.erInit.Do(func() {
+ fs.er = &dynamic.ExtensionRegistry{}
+ for _, fd := range fs.files {
+ fs.er.AddExtensionsFromFile(fd)
+ }
+ })
+ return fs.er.AllExtensionsForType(typeName), nil
+}
diff --git a/pixiu/pkg/client/proxy/proxy.go b/pixiu/pkg/client/proxy/proxy.go
new file mode 100644
index 00000000..6b7f45b7
--- /dev/null
+++ b/pixiu/pkg/client/proxy/proxy.go
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+ "context"
+ "net/url"
+)
+
+import (
+ "github.com/jhump/protoreflect/dynamic"
+ "github.com/jhump/protoreflect/dynamic/grpcdynamic"
+ "github.com/jhump/protoreflect/grpcreflect"
+ "github.com/pkg/errors"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/metadata"
+ rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
+ "google.golang.org/grpc/status"
+)
+
+type Proxy struct {
+ cc *grpc.ClientConn
+ reflector *Reflector
+ stub grpcdynamic.Stub
+}
+
+// NewProxy creates a new client
+func NewProxy() *Proxy {
+ return &Proxy{}
+}
+
+// Connect opens a connection to target.
+func (p *Proxy) Connect(ctx context.Context, target *url.URL) error {
+ cc, err := grpc.DialContext(ctx, target.String(),
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ return err
+ }
+ p.cc = cc
+ rc := grpcreflect.NewClient(ctx, rpb.NewServerReflectionClient(p.cc))
+ p.reflector = NewReflector(rc)
+ p.stub = grpcdynamic.NewStub(p.cc)
+ return err
+}
+
+// Call performs the gRPC call after doing reflection to obtain type
information.
+func (p *Proxy) Call(ctx context.Context, serviceName, methodName string,
message []byte, md *metadata.MD) ([]byte, error) {
+ invocation, err := p.reflector.CreateInvocation(ctx, serviceName,
methodName, message)
+ if err != nil {
+ return nil, err
+ }
+
+ output, err := p.stub.InvokeRpc(ctx, invocation.MethodDescriptor,
invocation.Message, grpc.Header(md))
+ if err != nil {
+ stat := status.Convert(err)
+ if stat.Code() == codes.Unavailable {
+ return nil, errors.Wrap(err, "could not connect to
backend")
+ }
+
+ return nil, errors.Wrap(err, stat.Message())
+ }
+
+ outputMessage :=
dynamic.NewMessage(invocation.MethodDescriptor.GetOutputType())
+ err = outputMessage.ConvertFrom(output)
+ if err != nil {
+ return nil, errors.Wrap(err, "response from backend could not
be converted internally")
+ }
+
+ m, err := outputMessage.MarshalJSON()
+ if err != nil {
+ return nil, err
+ }
+ return m, err
+}
diff --git a/pixiu/pkg/client/proxy/reflection.go
b/pixiu/pkg/client/proxy/reflection.go
new file mode 100644
index 00000000..4817e187
--- /dev/null
+++ b/pixiu/pkg/client/proxy/reflection.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package proxy
+
+import (
+ "context"
+)
+
+import (
+ "github.com/jhump/protoreflect/desc"
+ "github.com/jhump/protoreflect/dynamic"
+ "github.com/jhump/protoreflect/grpcreflect"
+ "github.com/pkg/errors"
+)
+
+type Reflector struct {
+ c *grpcreflect.Client
+}
+
+// NewReflector creates a new Reflector from the reflection client
+func NewReflector(client *grpcreflect.Client) *Reflector {
+ return &Reflector{client}
+}
+
+// CreateInvocation creates a MethodInvocation by performing reflection
+func (r *Reflector) CreateInvocation(ctx context.Context, serviceName,
methodName string, input []byte) (*MethodInvocation, error) {
+ var (
+ serviceDesc *desc.ServiceDescriptor
+ err error
+ )
+
+ if protosetSource != nil {
+ // Parsing reflection through protoset files
+ var dsc desc.Descriptor
+ dsc, err = protosetSource.FindSymbol(serviceName)
+ if err != nil {
+ return nil, errors.Errorf("service was not found
upstream even though it should have been there: %v", err)
+ }
+
+ var ok bool
+ serviceDesc, ok = dsc.(*desc.ServiceDescriptor)
+ if !ok {
+ return nil, errors.Errorf("target server does not
expose service %s", serviceName)
+ }
+ } else {
+ serviceDesc, err = r.c.ResolveService(serviceName)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ methodDesc := serviceDesc.FindMethodByName(methodName)
+ if methodDesc == nil {
+ return nil, errors.New("method not found upstream")
+ }
+ inputMessage := dynamic.NewMessage(methodDesc.GetInputType())
+ err = inputMessage.UnmarshalJSON(input)
+ if err != nil {
+ return nil, err
+ }
+ return &MethodInvocation{
+ MethodDescriptor: methodDesc,
+ Message: inputMessage,
+ }, nil
+}
+
+// MethodInvocation contains a method and a message used to invoke an RPC
+type MethodInvocation struct {
+ *desc.MethodDescriptor
+ *dynamic.Message
+}
diff --git a/pixiu/pkg/client/triple/triple.go
b/pixiu/pkg/client/triple/triple.go
index 5ee2dd55..58578f9f 100644
--- a/pixiu/pkg/client/triple/triple.go
+++ b/pixiu/pkg/client/triple/triple.go
@@ -20,26 +20,25 @@ package triple
import (
"context"
"io"
+ "net/http"
"net/url"
"strings"
"sync"
)
import (
- gerrors "github.com/mercari/grpc-http-proxy/errors"
- proxymeta "github.com/mercari/grpc-http-proxy/metadata"
- "github.com/mercari/grpc-http-proxy/proxy"
"github.com/pkg/errors"
- "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
)
import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/client"
+ "github.com/apache/dubbo-go-pixiu/pixiu/pkg/client/proxy"
)
// InitDefaultTripleClient init default dubbo client
-func InitDefaultTripleClient() {
- tripleClient = NewTripleClient()
+func InitDefaultTripleClient(protoset []string) {
+ tripleClient = NewTripleClient(protoset)
}
var (
@@ -48,9 +47,10 @@ var (
)
// NewTripleClient create dubbo client
-func NewTripleClient() *Client {
+func NewTripleClient(protoset []string) *Client {
clientOnce.Do(func() {
tripleClient = &Client{}
+ proxy.InitProtosetSource(protoset)
})
return tripleClient
}
@@ -68,40 +68,51 @@ func (tc *Client) MapParams(req *client.Request) (reqData
interface{}, err error
}
// Close clear GenericServicePool.
-func (dc *Client) Close() error {
+func (tc *Client) Close() error {
return nil
}
// SingletonTripleClient singleton dubbo clent
-func SingletonTripleClient() *Client {
- return NewTripleClient()
+func SingletonTripleClient(protoset []string) *Client {
+ return NewTripleClient(protoset)
}
// Call invoke service
-func (dc *Client) Call(req *client.Request) (res interface{}, err error) {
+func (tc *Client) Call(req *client.Request) (res interface{}, err error) {
address :=
strings.Split(req.API.IntegrationRequest.HTTPBackendConfig.URL, ":")
p := proxy.NewProxy()
targetURL := &url.URL{
Scheme: address[0],
Opaque: address[1],
}
- if err := p.Connect(context.Background(), targetURL); err != nil {
- return "", errors.Errorf("connect triple server error = %s",
err)
+ if err = p.Connect(context.Background(), targetURL); err != nil {
+ return nil, errors.Errorf("connect triple server error = %s",
err)
}
+
+ header := tc.forwardHeaders(req.IngressRequest.Header)
+ ctx := metadata.NewOutgoingContext(context.Background(), header)
meta := make(map[string][]string)
- reqData, _ := io.ReadAll(req.IngressRequest.Body)
- ctx, cancel := context.WithTimeout(context.Background(), req.Timeout)
+ reqData, err := io.ReadAll(req.IngressRequest.Body)
+ if err != nil {
+ return nil, errors.Errorf("read request body error = %s", err)
+ }
+
+ ctx, cancel := context.WithTimeout(ctx, req.Timeout)
defer cancel()
- call, err := p.Call(ctx, req.API.Method.IntegrationRequest.Interface,
req.API.Method.IntegrationRequest.Method, reqData, (*proxymeta.Metadata)(&meta))
+ call, err := p.Call(ctx, req.API.Method.IntegrationRequest.Interface,
req.API.Method.IntegrationRequest.Method, reqData, (*metadata.MD)(&meta))
if err != nil {
- gerr, ok := err.(*gerrors.GRPCError)
- if ok {
- statusCode := codes.Code(gerr.StatusCode)
- if statusCode == codes.Canceled || statusCode ==
codes.DeadlineExceeded {
- return "", errors.Errorf("call triple server
timeout error = %s", err)
- }
- }
return "", errors.Errorf("call triple server error = %s", err)
}
return call, nil
}
+
+// forwardHeaders specific what headers should be forwarded
+func (tc *Client) forwardHeaders(header http.Header) metadata.MD {
+ md := metadata.MD{}
+ for k, vals := range header {
+ if s := strings.ToLower(k); strings.HasPrefix(s, "tri-") {
+ md.Set(k, vals...)
+ }
+ }
+ return md
+}
diff --git a/pixiu/pkg/filter/http/remote/call.go
b/pixiu/pkg/filter/http/remote/call.go
index 2ba692ad..f6583539 100644
--- a/pixiu/pkg/filter/http/remote/call.go
+++ b/pixiu/pkg/filter/http/remote/call.go
@@ -104,7 +104,7 @@ func (factory *FilterFactory) Apply() error {
factory.conf.Level = level
// must init it at apply function
dubbo.InitDefaultDubboClient(factory.conf.Dpc)
- triple.InitDefaultTripleClient()
+ triple.InitDefaultTripleClient(factory.conf.Dpc.Protoset)
return nil
}
@@ -133,7 +133,7 @@ func (f *Filter) Decode(c *contexthttp.HttpContext)
filter.FilterStatus {
typ := api.Method.IntegrationRequest.RequestType
- cli, err := matchClient(typ)
+ cli, err := f.matchClient(typ)
if err != nil {
panic(err)
}
@@ -142,7 +142,7 @@ func (f *Filter) Decode(c *contexthttp.HttpContext)
filter.FilterStatus {
req.Timeout = c.Timeout
resp, err := cli.Call(req)
if err != nil {
- logger.Errorf("[dubbo-go-pixiu] client call err:%v!", err)
+ logger.Errorf("[dubbo-go-pixiu] client call err: %v!", err)
if strings.Contains(strings.ToLower(err.Error()), "timeout") {
c.SendLocalReply(http.StatusGatewayTimeout,
[]byte(fmt.Sprintf("client call timeout err: %s", err)))
return filter.Stop
@@ -151,19 +151,19 @@ func (f *Filter) Decode(c *contexthttp.HttpContext)
filter.FilterStatus {
return filter.Stop
}
- logger.Debugf("[dubbo-go-pixiu] client call resp:%v", resp)
+ logger.Debugf("[dubbo-go-pixiu] client call resp: %v", resp)
c.SourceResp = resp
return filter.Continue
}
-func matchClient(typ apiConf.RequestType) (client.Client, error) {
+func (f *Filter) matchClient(typ apiConf.RequestType) (client.Client, error) {
switch strings.ToLower(string(typ)) {
case string(apiConf.DubboRequest):
return dubbo.SingletonDubboClient(), nil
// todo @(laurence) add triple to apiConf
case "triple":
- return triple.SingletonTripleClient(), nil
+ return triple.SingletonTripleClient(f.conf.Dpc.Protoset), nil
case string(apiConf.HTTPRequest):
return clienthttp.SingletonHTTPClient(), nil
default: