[ 
https://issues.apache.org/jira/browse/THRIFT-5628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605158#comment-17605158
 ] 

Philip Lee commented on THRIFT-5628:
------------------------------------

The application polls some hardware collecting data for an extended period.

The generate interface is 

 
{code:java}
  public partial class SystemControl
  {
    public interface IAsync
 ...
{code}
A single instance of SystemControl is created like so, where host is an ip 
address string.  This code used to be simpler (2nd part of if statement) but 
this failed when there was no DNS on the network (see Thrift-5610).
{code:java}
    public static SystemControl.IAsync DefaultFactoryAsync(string host, int 
port)
    {
        if (System.Net.IPAddress.TryParse(host, out var address))
        {
            // Create a TcpClient
            var client = new TcpClient(address.ToString(), port)
            {
                // duplicate setting used in other TSocketTransport constructors
                NoDelay = true
            };
            var socket = new TSocketTransport(client, null);  
// using 'new TConfiguration { MaxMessageSize = int.MaxValue }' as a work 
around for now
            var transport = new TBufferedTransport(socket);
            var protocol = new TBinaryProtocol(transport);
            return new SystemControl.Client(protocol);
        }
        else
        {
            // TSocketTransport('hostname/ip-address', portnumber, ...) calls 
Dns.GetHostEntry(host)
            // which fails for the internal network of the instrument.
            var socket = new TSocketTransport(host, port, null);
            var transport = new TBufferedTransport(socket);
            var protocol = new TBinaryProtocol(transport);
            return new SystemControl.Client(protocol);
        }
    }
{code}
The hardware is then polled with code similar to
{code:java}
        _task = Task.Run(async () => {
              var result = await _client.get_next_waveform(sessionId, timeout, 
cancellationToken); 
              await callback.Invoke(result);
        }
{code}
Where the Thrift compiler has generated this

 
{code:java}
      public async 
global::System.Threading.Tasks.Task<global::TeraView.TeraPulse.Rpc.Thrift.Waveform>
 get_next_waveform(string session, double timeout, CancellationToken 
cancellationToken = default)
      {
        await send_get_next_waveform(session, timeout, cancellationToken);
        return await recv_get_next_waveform(cancellationToken);
      }
      public async global::System.Threading.Tasks.Task 
send_get_next_waveform(string session, double timeout, CancellationToken 
cancellationToken = default)
      {
        await OutputProtocol.WriteMessageBeginAsync(new 
TMessage("get_next_waveform", TMessageType.Call, SeqId), cancellationToken);
        
        var tmp194 = new InternalStructs.get_next_waveform_args() {
          Session = session,
          Timeout = timeout,
        };
        
        await tmp194.WriteAsync(OutputProtocol, cancellationToken);
        await OutputProtocol.WriteMessageEndAsync(cancellationToken);
        await OutputProtocol.Transport.FlushAsync(cancellationToken);
      }
      public async 
global::System.Threading.Tasks.Task<global::TeraView.TeraPulse.Rpc.Thrift.Waveform>
 recv_get_next_waveform(CancellationToken cancellationToken = default)
      {
        
        var tmp195 = await 
InputProtocol.ReadMessageBeginAsync(cancellationToken);
        if (tmp195.Type == TMessageType.Exception)
        {
          var tmp196 = await TApplicationException.ReadAsync(InputProtocol, 
cancellationToken);
          await InputProtocol.ReadMessageEndAsync(cancellationToken);
          throw tmp196;
        }
        var tmp197 = new InternalStructs.get_next_waveform_result();
        await tmp197.ReadAsync(InputProtocol, cancellationToken);
        await InputProtocol.ReadMessageEndAsync(cancellationToken);
        if (tmp197.__isset.success)
        {
          return tmp197.Success;
        }
        if (tmp197.__isset.exc)
        {
          throw tmp197.Exc;
        }
        throw new 
TApplicationException(TApplicationException.ExceptionType.MissingResult, 
"get_next_waveform failed: unknown result");
      }
{code}
I'm thinking that ReadMessageEndAsync should reset MaxMessageSize.

 

> MaxMessageSize is never reset on a read buffer
> ----------------------------------------------
>
>                 Key: THRIFT-5628
>                 URL: https://issues.apache.org/jira/browse/THRIFT-5628
>             Project: Thrift
>          Issue Type: Bug
>          Components: netstd - Library
>    Affects Versions: 0.16.0
>            Reporter: Philip Lee
>            Priority: Major
>
> It appears that for the ReadBuffer of a TMemoryBufferTransport the method 
> CountConsumedMessageBytes() is called, but ResetConsumedMessageSize() is 
> never called. 
> Our code as a long lived client which is polling periodically for an extended 
> time. RemainingMessageSize eventually falls to <= 0 and a 
> TTransportException("MaxMessageSize reached") is then thrown.
> Is this a bug or expected?
> I can fix this by changing TMemoryBufferTransport as follows
> {code:java}
> public override ValueTask<int> ReadAsync(byte[] buffer, int offset, int 
> length, CancellationToken cancellationToken)
> {
>   var count = Math.Min(Length - Position, length);
>   Buffer.BlockCopy(Bytes, Position, buffer, offset, count);
>   Position += count;
>   CountConsumedMessageBytes(count);
>   ---> ResetConsumedMessageSize();
>   return new ValueTask<int>(count);
> }{code}
> but not confident this is correct.
> Or as a work around I can set TConfiguration.MaxMessageSize = int.MaxValue 
> which will allow our code to operate for longer (20x) before failing.
> Or I can recreate the client periodically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to