[ 
https://issues.apache.org/jira/browse/THRIFT-4187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010726#comment-16010726
 ] 

ASF GitHub Bot commented on THRIFT-4187:
----------------------------------------

Github user markerickson-wf commented on a diff in the pull request:

    https://github.com/apache/thrift/pull/1269#discussion_r116525084
  
    --- Diff: lib/dart/lib/src/transport/t_framed_transport.dart ---
    @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport {
           if (got > 0) return got;
         }
     
    -    _readFrame();
    +    // IMPORTANT: by the time you've got here,
    +    // an entire frame is available for reading
     
         return super.read(buffer, offset, length);
       }
     
       void _readFrame() {
    -    _transport.readAll(headerBytes, 0, headerByteCount);
    -    int size = headerBytes.buffer.asByteData().getUint32(0);
    +    if (_body == null) {
    +      bool gotFullHeader = _readFrameHeader();
    +      if (!gotFullHeader) {
    +        return;
    +      }
    +    }
    +
    +    _readFrameBody();
    +  }
    +
    +  bool _readFrameHeader() {
    +    var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes;
     
    -    if (size < 0) {
    +    int got = _transport.read(_headerBytes, _receivedHeaderBytes, 
remainingHeaderBytes);
    +    if (got < 0) {
           throw new TTransportError(
    -          TTransportErrorType.UNKNOWN, "Read a negative frame size: 
$size");
    +          TTransportErrorType.UNKNOWN, "Socket closed during frame header 
read");
         }
     
    -    Uint8List buffer = new Uint8List(size);
    -    _transport.readAll(buffer, 0, size);
    -    _setReadBuffer(buffer);
    +    _receivedHeaderBytes += got;
    +
    +    if (_receivedHeaderBytes == headerByteCount) {
    +      int size = _headerBytes.buffer.asByteData().getUint32(0);
    +
    +      _receivedHeaderBytes = 0;
    +
    +      if (size < 0) {
    +        throw new TTransportError(
    +            TTransportErrorType.UNKNOWN, "Read a negative frame size: 
$size");
    +      }
    +
    +      _bodySize = size;
    +      _body = new Uint8List(_bodySize);
    +      _receivedBodyBytes = 0;
    +
    +      return true;
    +    } else {
    +      _registerForReadableBytes();
    +      return false;
    +    }
    +  }
    +
    +  void _readFrameBody() {
    +    var remainingBodyBytes = _bodySize - _receivedBodyBytes;
    +
    +    int got = _transport.read(_body, _receivedBodyBytes, 
remainingBodyBytes);
    +    if (got < 0) {
    +      throw new TTransportError(
    +          TTransportErrorType.UNKNOWN, "Socket closed during frame body 
read");
    +    }
    +
    +    _receivedBodyBytes += got;
    +
    +    if (_receivedBodyBytes == _bodySize) {
    +      var body = _body;
    +
    +      _bodySize = 0;
    +      _body = null;
    +      _receivedBodyBytes = 0;
    +
    +      _setReadBuffer(body);
    +
    +      var completer = _frameCompleter;
    +      _frameCompleter = null;
    +      completer.complete(new Object());
    +    } else {
    +      _registerForReadableBytes();
    +    }
       }
     
       Future flush() {
    -    Uint8List buffer = consumeWriteBuffer();
    -    int length = buffer.length;
    +    if (_frameCompleter == null) {
    +      Uint8List buffer = consumeWriteBuffer();
    +      int length = buffer.length;
    +
    +      _headerBytes.buffer.asByteData().setUint32(0, length);
    +      _transport.write(_headerBytes, 0, headerByteCount);
    +      _transport.write(buffer, 0, length);
    +
    +      _frameCompleter  = new Completer<Object>(); // FIXME: .sync?!
    --- End diff --
    
    
https://api.dartlang.org/stable/1.23.0/dart-async/Completer/Completer.sync.html
    
    Despite the warning above, I used `sync` [in 
t_http_transport](https://github.com/apache/thrift/blob/master/lib/dart/lib/src/transport/t_http_transport.dart#L52).
  If I recall correctly, this is a symptom of adapting the sync API to an async 
system.  If two subsequent messages arrive on the socket, the second could 
stomp on the first before the listener has a chance to react.  i.e. 
`flush`->`complete message 1`->`complete message 2`->`read`->`read`.  By using 
a sync completer, I think the first listener should be able to read before the 
second message is completed.


> Dart -> Rust Framed cross tests fail
> ------------------------------------
>
>                 Key: THRIFT-4187
>                 URL: https://issues.apache.org/jira/browse/THRIFT-4187
>             Project: Thrift
>          Issue Type: Bug
>          Components: Rust - Library
>            Reporter: Allen George
>            Assignee: Allen George
>            Priority: Minor
>
> For some reason the Dart (client) -> Rust (server) framed-transport cross 
> tests fail. Initial investigation shows that *somehow* the dart client think 
> the socket was closed, which means it doesn't read the message from the 
> underlying transport.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to