[ https://issues.apache.org/jira/browse/THRIFT-4187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010671#comment-16010671 ]
ASF GitHub Bot commented on THRIFT-4187: ---------------------------------------- Github user allengeorge commented on a diff in the pull request: https://github.com/apache/thrift/pull/1269#discussion_r116517213 --- Diff: lib/dart/lib/src/transport/t_framed_transport.dart --- @@ -51,33 +58,112 @@ class TFramedTransport extends TBufferedTransport { if (got > 0) return got; } - _readFrame(); + // IMPORTANT: by the time you've got here, + // an entire frame is available for reading return super.read(buffer, offset, length); } void _readFrame() { - _transport.readAll(headerBytes, 0, headerByteCount); - int size = headerBytes.buffer.asByteData().getUint32(0); + if (_body == null) { + bool gotFullHeader = _readFrameHeader(); + if (!gotFullHeader) { + return; + } + } + + _readFrameBody(); + } + + bool _readFrameHeader() { + var remainingHeaderBytes = headerByteCount - _receivedHeaderBytes; - if (size < 0) { + int got = _transport.read(_headerBytes, _receivedHeaderBytes, remainingHeaderBytes); + if (got < 0) { throw new TTransportError( - TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + TTransportErrorType.UNKNOWN, "Socket closed during frame header read"); } - Uint8List buffer = new Uint8List(size); - _transport.readAll(buffer, 0, size); - _setReadBuffer(buffer); + _receivedHeaderBytes += got; + + if (_receivedHeaderBytes == headerByteCount) { + int size = _headerBytes.buffer.asByteData().getUint32(0); + + _receivedHeaderBytes = 0; + + if (size < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Read a negative frame size: $size"); + } + + _bodySize = size; + _body = new Uint8List(_bodySize); + _receivedBodyBytes = 0; + + return true; + } else { + _registerForReadableBytes(); + return false; + } + } + + void _readFrameBody() { + var remainingBodyBytes = _bodySize - _receivedBodyBytes; + + int got = _transport.read(_body, _receivedBodyBytes, remainingBodyBytes); + if (got < 0) { + throw new TTransportError( + TTransportErrorType.UNKNOWN, "Socket closed during frame body read"); + } + + _receivedBodyBytes += got; + + if (_receivedBodyBytes == _bodySize) { + var body = _body; + + _bodySize = 0; + _body = null; + _receivedBodyBytes = 0; + + _setReadBuffer(body); + + var completer = _frameCompleter; + _frameCompleter = null; + completer.complete(new Object()); + } else { + _registerForReadableBytes(); + } } Future flush() { - Uint8List buffer = consumeWriteBuffer(); - int length = buffer.length; + if (_frameCompleter == null) { + Uint8List buffer = consumeWriteBuffer(); + int length = buffer.length; + + _headerBytes.buffer.asByteData().setUint32(0, length); + _transport.write(_headerBytes, 0, headerByteCount); + _transport.write(buffer, 0, length); + + _frameCompleter = new Completer<Object>(); // FIXME: .sync?! --- End diff -- Ah. Actually I need advice from you as to whether this has to be a `sync` `Completer` or not :) It's not quite clear to me when one would choose one, and whether that's necessary at this point. > Dart -> Rust Framed cross tests fail > ------------------------------------ > > Key: THRIFT-4187 > URL: https://issues.apache.org/jira/browse/THRIFT-4187 > Project: Thrift > Issue Type: Bug > Components: Rust - Library > Reporter: Allen George > Assignee: Allen George > Priority: Minor > > For some reason the Dart (client) -> Rust (server) framed-transport cross > tests fail. Initial investigation shows that *somehow* the dart client think > the socket was closed, which means it doesn't read the message from the > underlying transport. -- This message was sent by Atlassian JIRA (v6.3.15#6346)