[ https://issues.apache.org/jira/browse/THRIFT-5628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605158#comment-17605158 ]
Philip Lee commented on THRIFT-5628: ------------------------------------ The application polls some hardware collecting data for an extended period. The generate interface is {code:java} public partial class SystemControl { public interface IAsync ... {code} A single instance of SystemControl is created like so, where host is an ip address string. This code used to be simpler (2nd part of if statement) but this failed when there was no DNS on the network (see Thrift-5610). {code:java} public static SystemControl.IAsync DefaultFactoryAsync(string host, int port) { if (System.Net.IPAddress.TryParse(host, out var address)) { // Create a TcpClient var client = new TcpClient(address.ToString(), port) { // duplicate setting used in other TSocketTransport constructors NoDelay = true }; var socket = new TSocketTransport(client, null); // using 'new TConfiguration { MaxMessageSize = int.MaxValue }' as a work around for now var transport = new TBufferedTransport(socket); var protocol = new TBinaryProtocol(transport); return new SystemControl.Client(protocol); } else { // TSocketTransport('hostname/ip-address', portnumber, ...) calls Dns.GetHostEntry(host) // which fails for the internal network of the instrument. var socket = new TSocketTransport(host, port, null); var transport = new TBufferedTransport(socket); var protocol = new TBinaryProtocol(transport); return new SystemControl.Client(protocol); } } {code} The hardware is then polled with code similar to {code:java} _task = Task.Run(async () => { var result = await _client.get_next_waveform(sessionId, timeout, cancellationToken); await callback.Invoke(result); } {code} Where the Thrift compiler has generated this {code:java} public async global::System.Threading.Tasks.Task<global::TeraView.TeraPulse.Rpc.Thrift.Waveform> get_next_waveform(string session, double timeout, CancellationToken cancellationToken = default) { await send_get_next_waveform(session, timeout, cancellationToken); return await recv_get_next_waveform(cancellationToken); } public async global::System.Threading.Tasks.Task send_get_next_waveform(string session, double timeout, CancellationToken cancellationToken = default) { await OutputProtocol.WriteMessageBeginAsync(new TMessage("get_next_waveform", TMessageType.Call, SeqId), cancellationToken); var tmp194 = new InternalStructs.get_next_waveform_args() { Session = session, Timeout = timeout, }; await tmp194.WriteAsync(OutputProtocol, cancellationToken); await OutputProtocol.WriteMessageEndAsync(cancellationToken); await OutputProtocol.Transport.FlushAsync(cancellationToken); } public async global::System.Threading.Tasks.Task<global::TeraView.TeraPulse.Rpc.Thrift.Waveform> recv_get_next_waveform(CancellationToken cancellationToken = default) { var tmp195 = await InputProtocol.ReadMessageBeginAsync(cancellationToken); if (tmp195.Type == TMessageType.Exception) { var tmp196 = await TApplicationException.ReadAsync(InputProtocol, cancellationToken); await InputProtocol.ReadMessageEndAsync(cancellationToken); throw tmp196; } var tmp197 = new InternalStructs.get_next_waveform_result(); await tmp197.ReadAsync(InputProtocol, cancellationToken); await InputProtocol.ReadMessageEndAsync(cancellationToken); if (tmp197.__isset.success) { return tmp197.Success; } if (tmp197.__isset.exc) { throw tmp197.Exc; } throw new TApplicationException(TApplicationException.ExceptionType.MissingResult, "get_next_waveform failed: unknown result"); } {code} I'm thinking that ReadMessageEndAsync should reset MaxMessageSize. > MaxMessageSize is never reset on a read buffer > ---------------------------------------------- > > Key: THRIFT-5628 > URL: https://issues.apache.org/jira/browse/THRIFT-5628 > Project: Thrift > Issue Type: Bug > Components: netstd - Library > Affects Versions: 0.16.0 > Reporter: Philip Lee > Priority: Major > > It appears that for the ReadBuffer of a TMemoryBufferTransport the method > CountConsumedMessageBytes() is called, but ResetConsumedMessageSize() is > never called. > Our code as a long lived client which is polling periodically for an extended > time. RemainingMessageSize eventually falls to <= 0 and a > TTransportException("MaxMessageSize reached") is then thrown. > Is this a bug or expected? > I can fix this by changing TMemoryBufferTransport as follows > {code:java} > public override ValueTask<int> ReadAsync(byte[] buffer, int offset, int > length, CancellationToken cancellationToken) > { > var count = Math.Min(Length - Position, length); > Buffer.BlockCopy(Bytes, Position, buffer, offset, count); > Position += count; > CountConsumedMessageBytes(count); > ---> ResetConsumedMessageSize(); > return new ValueTask<int>(count); > }{code} > but not confident this is correct. > Or as a work around I can set TConfiguration.MaxMessageSize = int.MaxValue > which will allow our code to operate for longer (20x) before failing. > Or I can recreate the client periodically. -- This message was sent by Atlassian Jira (v8.20.10#820010)