This is an automated email from the ASF dual-hosted git repository. alexstocks pushed a commit to branch fix/getty-eof in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
commit a2869eb2865e57f1ce67b8fcf0da6524ceb467ab Author: AlexStocks <[email protected]> AuthorDate: Sat Apr 9 15:34:56 2022 +0800 return decode value as getty standard --- protocol/dubbo/dubbo_codec.go | 34 +++++++++++++++++++++------------- protocol/dubbo/impl/codec.go | 3 +++ remoting/getty/readwriter.go | 9 +++++---- 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/protocol/dubbo/dubbo_codec.go b/protocol/dubbo/dubbo_codec.go index 31b13dc00..03f228d00 100644 --- a/protocol/dubbo/dubbo_codec.go +++ b/protocol/dubbo/dubbo_codec.go @@ -34,7 +34,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/common/logger" "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/dubbo/impl" - "dubbo.apache.org/dubbo-go/v3/protocol/invocation" + invct "dubbo.apache.org/dubbo-go/v3/protocol/invocation" "dubbo.apache.org/dubbo-go/v3/remoting" ) @@ -161,19 +161,25 @@ func (c *DubboCodec) EncodeResponse(response *remoting.Response) (*bytes.Buffer, // Decode data, including request and response. func (c *DubboCodec) Decode(data []byte) (remoting.DecodeResult, int, error) { + var res remoting.DecodeResult + + dataLen := len(data) + if dataLen < impl.HEADER_LENGTH { // check whether header bytes is enough or not + return res, 0, nil + } if c.isRequest(data) { - req, len, err := c.decodeRequest(data) + req, length, err := c.decodeRequest(data) if err != nil { - return remoting.DecodeResult{}, len, perrors.WithStack(err) + return remoting.DecodeResult{}, length, perrors.WithStack(err) } - return remoting.DecodeResult{IsRequest: true, Result: req}, len, perrors.WithStack(err) + return remoting.DecodeResult{IsRequest: true, Result: req}, length, perrors.WithStack(err) } - resp, len, err := c.decodeResponse(data) + resp, length, err := c.decodeResponse(data) if err != nil { - return remoting.DecodeResult{}, len, perrors.WithStack(err) + return remoting.DecodeResult{}, length, perrors.WithStack(err) } - return remoting.DecodeResult{IsRequest: false, Result: resp}, len, perrors.WithStack(err) + return remoting.DecodeResult{IsRequest: false, Result: resp}, length, perrors.WithStack(err) } func (c *DubboCodec) isRequest(data []byte) bool { @@ -182,16 +188,18 @@ func (c *DubboCodec) isRequest(data []byte) bool { // decode request func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) { - var request *remoting.Request = nil + var request *remoting.Request buf := bytes.NewBuffer(data) pkg := impl.NewDubboPackage(buf) pkg.SetBody(make([]interface{}, 7)) err := pkg.Unmarshal() if err != nil { originErr := perrors.Cause(err) - if originErr == hessian.ErrHeaderNotEnough || originErr == hessian.ErrBodyNotEnough { - // FIXME - return nil, 0, originErr + if originErr == hessian.ErrHeaderNotEnough { // this is impossible, as dubbo_codec.go:DubboCodec::Decode() line 167 + return nil, 0, nil + } + if originErr == hessian.ErrBodyNotEnough { + return nil, hessian.HEADER_LENGTH + pkg.GetBodyLen(), nil } logger.Errorf("pkg.Unmarshal(len(@data):%d) = error:%+v", buf.Len(), err) @@ -223,8 +231,8 @@ func (c *DubboCodec) decodeRequest(data []byte) (*remoting.Request, int, error) methodName = pkg.Service.Method args = req[impl.ArgsKey].([]interface{}) attachments = req[impl.AttachmentsKey].(map[string]interface{}) - invoc := invocation.NewRPCInvocationWithOptions(invocation.WithAttachments(attachments), - invocation.WithArguments(args), invocation.WithMethodName(methodName)) + invoc := invct.NewRPCInvocationWithOptions(invct.WithAttachments(attachments), + invct.WithArguments(args), invct.WithMethodName(methodName)) request.Data = invoc } diff --git a/protocol/dubbo/impl/codec.go b/protocol/dubbo/impl/codec.go index cc7abad48..95acbd9e0 100644 --- a/protocol/dubbo/impl/codec.go +++ b/protocol/dubbo/impl/codec.go @@ -157,6 +157,9 @@ func (c *ProtocolCodec) Decode(p *DubboPackage) error { return err } } + if c.reader.Size() < p.GetBodyLen() { + return hessian.ErrBodyNotEnough + } body, err := c.reader.Peek(p.GetBodyLen()) if err != nil { return err diff --git a/remoting/getty/readwriter.go b/remoting/getty/readwriter.go index c78f354d6..48f2466c5 100644 --- a/remoting/getty/readwriter.go +++ b/remoting/getty/readwriter.go @@ -49,15 +49,16 @@ func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler { // and send to client each time. the Read can assemble it. func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { resp, length, err := (p.client.codec).Decode(data) - // err := pkg.Unmarshal(buf, p.client) if err != nil { - if errors.Is(err, hessian.ErrHeaderNotEnough) || errors.Is(err, hessian.ErrBodyNotEnough) { + if errors.Is(err, hessian.ErrHeaderNotEnough) { return nil, 0, nil } + if errors.Is(err, hessian.ErrBodyNotEnough) { + return nil, length, nil + } logger.Errorf("pkg.Unmarshal(ss:%+v, len(@data):%d) = error:%+v", ss, len(data), err) - - return nil, length, err + return nil, 0, err } return resp, length, nil
