This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c73cc6d refactor(csharp): use semaphore in tcp implementation  (#2179)
5c73cc6d is described below

commit 5c73cc6dcf540bbb6b2a57c1c4d4b1cdfd5d6a80
Author: Ɓukasz Zborek <[email protected]>
AuthorDate: Sat Sep 27 19:02:53 2025 +0200

    refactor(csharp): use semaphore in tcp implementation  (#2179)
    
    Refactor the current TCP implementation to use a semaphore.
    This will ensure that only one thread will use one client for TCP 
communication at the same time.
---
 foreign/csharp/DEPENDENCIES.md                     |   6 +-
 foreign/csharp/Directory.Packages.props            |   6 +-
 .../Iggy_SDK.Tests.Integration/OffsetTests.cs      |  15 +
 .../csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs  |   4 +-
 .../IggyClient/Implementations/TcpMessageStream.cs | 414 +++++----------------
 foreign/csharp/README.md                           |   6 +-
 6 files changed, 120 insertions(+), 331 deletions(-)

diff --git a/foreign/csharp/DEPENDENCIES.md b/foreign/csharp/DEPENDENCIES.md
index c0197206..f39b6a96 100644
--- a/foreign/csharp/DEPENDENCIES.md
+++ b/foreign/csharp/DEPENDENCIES.md
@@ -9,10 +9,10 @@ Microsoft.NET.Test.Sdk: "17.14.1", "MIT",
 Microsoft.Testing.Extensions.CodeCoverage: "17.14.2", "MIT",
 Microsoft.Testing.Extensions.TrxReport: "1.8.2", "MIT",
 Moq: "4.20.72", "BSD-3-Clause",
-Reqnroll.xUnit: "2.4.1", "BSD-3-Clause",
+Reqnroll.xUnit: "3.0.3", "BSD-3-Clause",
 Shouldly: "4.3.0", "BSD-3-Clause",
 System.IO.Hashing: "8.0.0", "MIT",
-Testcontainers: "4.6.0", "MIT",
-TUnit: "0.57.65", "MIT",
+Testcontainers: "4.7.0", "MIT",
+TUnit: "0.58.3", "MIT",
 xunit: "2.9.3", "Apache-2.0",
 xunit.runner.visualstudio: "3.1.3", "Apache-2.0",
diff --git a/foreign/csharp/Directory.Packages.props 
b/foreign/csharp/Directory.Packages.props
index 67d869cf..946b2959 100644
--- a/foreign/csharp/Directory.Packages.props
+++ b/foreign/csharp/Directory.Packages.props
@@ -11,13 +11,13 @@
         <PackageVersion Include="Microsoft.Extensions.Logging.Console" 
Version="8.0.1" />
         <PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
         <PackageVersion Include="Microsoft.Testing.Extensions.CodeCoverage" 
Version="17.14.2" />
-        <PackageVersion Include="Microsoft.Testing.Extensions.TrxReport" 
Version="1.8.2" />
+        <PackageVersion Include="Microsoft.Testing.Extensions.TrxReport" 
Version="1.8.4" />
         <PackageVersion Include="Moq" Version="4.20.72" />
-        <PackageVersion Include="Reqnroll.xUnit" Version="2.4.1" />
+        <PackageVersion Include="Reqnroll.xUnit" Version="3.0.3" />
         <PackageVersion Include="Shouldly" Version="4.3.0" />
         <PackageVersion Include="System.IO.Hashing" Version="8.0.0" />
         <PackageVersion Include="Testcontainers" Version="4.7.0" />
-        <PackageVersion Include="TUnit" Version="0.57.65" />
+        <PackageVersion Include="TUnit" Version="0.58.3" />
         <PackageVersion Include="xunit" Version="2.9.3" />
         <PackageVersion Include="xunit.runner.visualstudio" Version="3.1.4">
             <PrivateAssets>all</PrivateAssets>
diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/OffsetTests.cs 
b/foreign/csharp/Iggy_SDK.Tests.Integration/OffsetTests.cs
index 70136132..9a22aed7 100644
--- a/foreign/csharp/Iggy_SDK.Tests.Integration/OffsetTests.cs
+++ b/foreign/csharp/Iggy_SDK.Tests.Integration/OffsetTests.cs
@@ -82,4 +82,19 @@ public class OffsetTests
         offset.PartitionId.ShouldBe(1);
         offset.CurrentOffset.ShouldBe(3u);
     }
+
+    [Test]
+    
[DependsOn(nameof(StoreOffset_ConsumerGroup_Should_StoreOffset_Successfully))]
+    
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
+    public async Task 
GetOffset_ConsumerGroup_ByName_Should_GetOffset_Successfully(Protocol protocol)
+    {
+        var offset = await 
Fixture.Clients[protocol].GetOffsetAsync(Consumer.Group("test_consumer_group"),
+            Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), 
Identifier.String(Fixture.TopicRequest.Name),
+            1);
+
+        offset.ShouldNotBeNull();
+        offset.StoredOffset.ShouldBe(SetOffset);
+        offset.PartitionId.ShouldBe(1);
+        offset.CurrentOffset.ShouldBe(3u);
+    }
 }
diff --git a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs 
b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
index 1b5b3b8a..631fc523 100644
--- a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
+++ b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
@@ -749,7 +749,7 @@ internal static class TcpContracts
         bytes.WriteBytesFromIdentifier(consumer.Id, 1);
         var position = 1 + consumer.Id.Length + 2;
         bytes.WriteBytesFromStreamAndTopicIdentifiers(streamId, topicId, 
position);
-        position = 7 + 2 + streamId.Length + 2 + topicId.Length;
+        position += + 2 + streamId.Length + 2 + topicId.Length;
         BinaryPrimitives.WriteUInt32LittleEndian(bytes[position..(position + 
4)], partitionId ?? 0);
         return bytes.ToArray();
     }
@@ -809,4 +809,4 @@ internal static class TcpContracts
         BinaryPrimitives.WriteUInt32LittleEndian(bytes[position..(position + 
4)], partitionId ?? 0);
         return bytes.ToArray();
     }
-}
\ No newline at end of file
+}
diff --git 
a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs 
b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs
index 3b30d652..4294409d 100644
--- a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs
+++ b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using System.Buffers;
 using System.Buffers.Binary;
 using System.IO.Hashing;
 using System.Runtime.CompilerServices;
@@ -44,6 +43,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
     private readonly ILogger<TcpMessageStream> _logger;
     private readonly IMessageInvoker? _messageInvoker;
     private readonly MessagePollingSettings _messagePollingSettings;
+    private readonly SemaphoreSlim _semaphore;
     private readonly IConnectionStream _stream;
 
     internal TcpMessageStream(IConnectionStream stream, 
Channel<MessageSendRequest>? channel,
@@ -55,12 +55,14 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         _messagePollingSettings = messagePollingSettings;
         _messageInvoker = messageInvoker;
         _logger = loggerFactory.CreateLogger<TcpMessageStream>();
+        _semaphore = new SemaphoreSlim(1, 1);
     }
 
     public void Dispose()
     {
         _stream.Close();
         _stream.Dispose();
+        _semaphore.Dispose();
     }
 
     public async Task<StreamResponse?> CreateStreamAsync(string name, uint? 
streamId, CancellationToken token = default)
@@ -69,10 +71,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.CREATE_STREAM_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -88,10 +87,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.GET_STREAM_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -107,10 +103,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.GET_STREAMS_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -126,10 +119,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.UPDATE_STREAM_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task PurgeStreamAsync(Identifier streamId, CancellationToken 
token = default)
@@ -138,10 +128,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.PURGE_STREAM_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task DeleteStreamAsync(Identifier streamId, CancellationToken 
token = default)
@@ -150,10 +137,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.DELETE_STREAM_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task<IReadOnlyList<TopicResponse>> GetTopicsAsync(Identifier 
streamId,
@@ -163,10 +147,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.GET_TOPICS_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -183,10 +164,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.GET_TOPIC_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -207,10 +185,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.CREATE_TOPIC_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -230,10 +205,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.UPDATE_TOPIC_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task DeleteTopicAsync(Identifier streamId, Identifier 
topicId, CancellationToken token = default)
@@ -242,10 +214,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.DELETE_TOPIC_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task PurgeTopicAsync(Identifier streamId, Identifier topicId, 
CancellationToken token = default)
@@ -254,10 +223,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.PURGE_TOPIC_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task SendMessagesAsync(MessageSendRequest request,
@@ -272,7 +238,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         //TODO - explore making fields of Message class mutable, so there is 
no need to create em from scratch
         if (encryptor is not null)
         {
-            for (var i = 0; i < request.Messages.Count || 
token.IsCancellationRequested; i++)
+            for (var i = 0; i < request.Messages.Count; i++)
             {
                 request.Messages[i] = request.Messages[i] with { Payload = 
encryptor(request.Messages[i].Payload) };
             }
@@ -345,62 +311,30 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         CancellationToken token = default)
     {
         var message = TcpContracts.FlushUnsavedBuffer(streamId, topicId, 
partitionId, fsync);
-        ;
+
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.FLUSH_UNSAVED_BUFFER_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task<PolledMessages<TMessage>> 
PollMessagesAsync<TMessage>(MessageFetchRequest request,
         Func<byte[], TMessage> serializer, Func<byte[], byte[]>? decryptor = 
null, CancellationToken token = default)
     {
-        await SendFetchMessagesRequestPayload(request.Consumer, 
request.StreamId, request.TopicId,
-            request.PollingStrategy,
-            request.Count, request.AutoCommit, request.PartitionId, token);
-        IMemoryOwner<byte> buffer = 
MemoryPool<byte>.Shared.Rent(BufferSizes.EXPECTED_RESPONSE_SIZE);
-        try
-        {
-            await 
_stream.ReadAsync(buffer.Memory[..BufferSizes.EXPECTED_RESPONSE_SIZE], token);
-            var response = 
TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer.Memory.Span);
-            if (response.Status != 0)
-            {
-                if (response.Length == 0)
-                {
-                    throw new InvalidResponseException($"Invalid response 
status code: {response.Status}");
-                }
-
-                var errorBuffer = new byte[response.Length];
-                await _stream.ReadAsync(errorBuffer, token);
-                throw new 
InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
-            }
+        var messageBufferSize = CalculateMessageBufferSize(request.StreamId, 
request.TopicId, request.Consumer);
+        var payloadBufferSize = CalculatePayloadBufferSize(messageBufferSize);
+        var message = new byte[messageBufferSize];
+        var payload = new byte[payloadBufferSize];
 
-            if (response.Length <= 1)
-            {
-                return PolledMessages<TMessage>.Empty;
-            }
+        TcpContracts.GetMessages(message.AsSpan()[..messageBufferSize], 
request.Consumer, request.StreamId,
+            request.TopicId,
+            request.PollingStrategy, request.Count, request.AutoCommit, 
request.PartitionId);
+        TcpMessageStreamHelpers.CreatePayload(payload, 
message.AsSpan()[..messageBufferSize],
+            CommandCodes.POLL_MESSAGES_CODE);
 
-            IMemoryOwner<byte> responseBuffer = 
MemoryPool<byte>.Shared.Rent(response.Length);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
-            try
-            {
-                await 
_stream.ReadAsync(responseBuffer.Memory[..response.Length], token);
-                PolledMessages<TMessage> result = BinaryMapper.MapMessages(
-                    responseBuffer.Memory.Span[..response.Length], serializer, 
decryptor);
-                return result;
-            }
-            finally
-            {
-                responseBuffer.Dispose();
-            }
-        }
-        finally
-        {
-            buffer.Dispose();
-        }
+        return BinaryMapper.MapMessages(responseBuffer, serializer, decryptor);
     }
 
     public async IAsyncEnumerable<MessageResponse<TMessage>> 
PollMessagesAsync<TMessage>(PollMessagesRequest request,
@@ -461,50 +395,19 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         Func<byte[], byte[]>? decryptor = null,
         CancellationToken token = default)
     {
-        await SendFetchMessagesRequestPayload(request.Consumer, 
request.StreamId, request.TopicId,
-            request.PollingStrategy,
-            request.Count, request.AutoCommit, request.PartitionId, token);
-
-        var buffer = 
ArrayPool<byte>.Shared.Rent(BufferSizes.EXPECTED_RESPONSE_SIZE);
-        try
-        {
-            await 
_stream.ReadAsync(buffer.AsMemory()[..BufferSizes.EXPECTED_RESPONSE_SIZE], 
token);
-
-            var response = 
TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
-            if (response.Status != 0)
-            {
-                if (response.Length == 0)
-                {
-                    throw new InvalidResponseException($"Invalid response 
status code: {response.Status}");
-                }
-
-                var errorBuffer = new byte[response.Length];
-                await _stream.ReadAsync(errorBuffer, token);
-                throw new 
InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
-            }
+        var messageBufferSize = CalculateMessageBufferSize(request.StreamId, 
request.TopicId, request.Consumer);
+        var payloadBufferSize = CalculatePayloadBufferSize(messageBufferSize);
+        var message = new byte[messageBufferSize];
+        var payload = new byte[payloadBufferSize];
 
-            if (response.Length <= 1)
-            {
-                return PolledMessages.Empty;
-            }
+        TcpContracts.GetMessages(message.AsSpan()[..messageBufferSize], 
request.Consumer, request.StreamId,
+            request.TopicId, request.PollingStrategy, request.Count, 
request.AutoCommit, request.PartitionId);
+        TcpMessageStreamHelpers.CreatePayload(payload, 
message.AsSpan()[..messageBufferSize],
+            CommandCodes.POLL_MESSAGES_CODE);
 
-            var responseBuffer = ArrayPool<byte>.Shared.Rent(response.Length);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
-            try
-            {
-                await 
_stream.ReadAsync(responseBuffer.AsMemory()[..response.Length], token);
-                var result = 
BinaryMapper.MapMessages(responseBuffer.AsSpan()[..response.Length], decryptor);
-                return result;
-            }
-            finally
-            {
-                ArrayPool<byte>.Shared.Return(responseBuffer);
-            }
-        }
-        finally
-        {
-            ArrayPool<byte>.Shared.Return(buffer);
-        }
+        return BinaryMapper.MapMessages(responseBuffer, decryptor);
     }
 
     public async Task StoreOffsetAsync(Consumer consumer, Identifier streamId, 
Identifier topicId, ulong offset,
@@ -514,10 +417,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.STORE_CONSUMER_OFFSET_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task<OffsetResponse?> GetOffsetAsync(Consumer consumer, 
Identifier streamId, Identifier topicId,
@@ -527,10 +427,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.GET_CONSUMER_OFFSET_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -547,10 +444,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.DELETE_CONSUMER_OFFSET_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task<IReadOnlyList<ConsumerGroupResponse>> 
GetConsumerGroupsAsync(Identifier streamId,
@@ -561,10 +455,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.GET_CONSUMER_GROUPS_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -581,10 +472,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.GET_CONSUMER_GROUP_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -601,10 +489,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.CREATE_CONSUMER_GROUP_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -621,10 +506,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.DELETE_CONSUMER_GROUP_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task JoinConsumerGroupAsync(Identifier streamId, Identifier 
topicId, Identifier groupId,
@@ -634,10 +516,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.JOIN_CONSUMER_GROUP_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task LeaveConsumerGroupAsync(Identifier streamId, Identifier 
topicId, Identifier groupId,
@@ -647,10 +526,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.LEAVE_CONSUMER_GROUP_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task DeletePartitionsAsync(Identifier streamId, Identifier 
topicId, uint partitionsCount,
@@ -660,10 +536,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.DELETE_PARTITIONS_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task CreatePartitionsAsync(Identifier streamId, Identifier 
topicId, uint partitionsCount,
@@ -673,10 +546,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.CREATE_PARTITIONS_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task<ClientResponse?> GetMeAsync(CancellationToken token = 
default)
@@ -685,10 +555,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.GET_ME_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -704,10 +571,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.GET_STATS_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -723,10 +587,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.PING_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task<IReadOnlyList<ClientResponse>> 
GetClientsAsync(CancellationToken token = default)
@@ -735,10 +596,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.GET_CLIENTS_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -754,10 +612,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.GET_CLIENT_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -773,10 +628,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.GET_USER_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -792,10 +644,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.GET_USERS_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -812,10 +661,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.CREATE_USER_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -831,10 +677,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.DELETE_USER_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task UpdateUser(Identifier userId, string? userName = null, 
UserStatus? status = null,
@@ -844,10 +687,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.UPDATE_USER_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task UpdatePermissions(Identifier userId, Permissions? 
permissions = null,
@@ -857,10 +697,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.UPDATE_PERMISSIONS_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task ChangePassword(Identifier userId, string 
currentPassword, string newPassword,
@@ -870,10 +707,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.CHANGE_PASSWORD_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task<AuthResponse?> LoginUser(string userName, string 
password, CancellationToken token = default)
@@ -882,10 +716,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.LOGIN_USER_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length <= 0)
         {
@@ -894,7 +725,6 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
 
         var userId = 
BinaryPrimitives.ReadInt32LittleEndian(responseBuffer.AsSpan()[..responseBuffer.Length]);
 
-        //TODO: Figure out how to solve this workaround about default of 
TokenInfo
         return new AuthResponse(userId, null);
     }
 
@@ -904,10 +734,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.LOGOUT_USER_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task<IReadOnlyList<PersonalAccessTokenResponse>> 
GetPersonalAccessTokensAsync(
@@ -917,10 +744,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.GET_PERSONAL_ACCESS_TOKENS_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -937,10 +761,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.CREATE_PERSONAL_ACCESS_TOKEN_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        var responseBuffer = await GetMessageAsync(token);
+        var responseBuffer = await SendWithResponseAsync(payload, token);
 
         if (responseBuffer.Length == 0)
         {
@@ -956,10 +777,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.DELETE_PERSONAL_ACCESS_TOKEN_CODE);
 
-        await _stream.SendAsync(payload, token);
-        await _stream.FlushAsync(token);
-
-        await CheckResponseAsync(token);
+        await SendWithResponseAsync(payload, token);
     }
 
     public async Task<AuthResponse?> LoginWithPersonalAccessToken(string 
token, CancellationToken ct = default)
@@ -968,10 +786,7 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH + 
message.Length];
         TcpMessageStreamHelpers.CreatePayload(payload, message, 
CommandCodes.LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE);
 
-        await _stream.SendAsync(payload, ct);
-        await _stream.FlushAsync(ct);
-
-        var responseBuffer = await GetMessageAsync(ct);
+        var responseBuffer = await SendWithResponseAsync(payload, ct);
 
         if (responseBuffer.Length <= 1)
         {
@@ -1019,93 +834,50 @@ public sealed class TcpMessageStream : IIggyClient, 
IDisposable
         writer.Complete();
     }
 
-    private async Task SendFetchMessagesRequestPayload(Consumer consumer, 
Identifier streamId, Identifier topicId,
-        PollingStrategy pollingStrategy,
-        int count, bool autoCommit, uint? partitionId, CancellationToken token)
+    private async Task<byte[]> SendWithResponseAsync(byte[] payload, 
CancellationToken token = default)
     {
-        var messageBufferSize = CalculateMessageBufferSize(streamId, topicId, 
consumer);
-        var payloadBufferSize = CalculatePayloadBufferSize(messageBufferSize);
-        var message = ArrayPool<byte>.Shared.Rent(messageBufferSize);
-        var payload = ArrayPool<byte>.Shared.Rent(payloadBufferSize);
-
         try
         {
-            TcpContracts.GetMessages(message.AsSpan()[..messageBufferSize], 
consumer, streamId, topicId,
-                pollingStrategy, count, autoCommit, partitionId);
-            TcpMessageStreamHelpers.CreatePayload(payload, 
message.AsSpan()[..messageBufferSize],
-                CommandCodes.POLL_MESSAGES_CODE);
-
-            await _stream.SendAsync(payload.AsMemory()[..payloadBufferSize], 
token);
-        }
-        finally
-        {
-            ArrayPool<byte>.Shared.Return(message);
-            ArrayPool<byte>.Shared.Return(payload);
-        }
-    }
+            await _semaphore.WaitAsync(token);
 
-    private async Task CheckResponseAsync(CancellationToken token = default)
-    {
-        var buffer = new byte[BufferSizes.EXPECTED_RESPONSE_SIZE];
-        var readBytes = await _stream.ReadAsync(buffer, token);
+            await _stream.SendAsync(payload, token);
+            await _stream.FlushAsync(token);
 
-        if (readBytes == 0)
-        {
-            throw new InvalidResponseException("Received empty response from 
server or connection was closed");
-        }
-
-        var response = 
TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
+            var buffer = new byte[BufferSizes.EXPECTED_RESPONSE_SIZE];
+            var readBytes = await _stream.ReadAsync(buffer, token);
 
-        if (response.Status != 0)
-        {
-            if (response.Length == 0)
+            if (readBytes == 0)
             {
-                throw new InvalidResponseException($"Invalid response status 
code: {response.Status}");
+                throw new InvalidResponseException("Received empty response 
from server or connection was closed");
             }
 
-            var errorBuffer = new byte[response.Length];
-            await _stream.ReadAsync(errorBuffer, token);
-            throw new 
InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
-        }
-
-        if (response.Length != 0)
-        {
-            throw new InvalidResponseException("Expected response length to be 
0, but got " + response.Length);
-        }
-    }
-
-    private async Task<byte[]> GetMessageAsync(CancellationToken token = 
default)
-    {
-        var buffer = new byte[BufferSizes.EXPECTED_RESPONSE_SIZE];
-        var readBytes = await _stream.ReadAsync(buffer, token);
+            var response = 
TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
 
-        if (readBytes == 0)
-        {
-            throw new InvalidResponseException("Received empty response from 
server or connection was closed");
-        }
+            if (response.Status != 0)
+            {
+                if (response.Length == 0)
+                {
+                    throw new InvalidResponseException($"Invalid response 
status code: {response.Status}");
+                }
 
-        var response = 
TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
+                var errorBuffer = new byte[response.Length];
+                await _stream.ReadAsync(errorBuffer, token);
+                throw new 
InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
+            }
 
-        if (response.Status != 0)
-        {
             if (response.Length == 0)
             {
-                throw new InvalidResponseException($"Invalid response status 
code: {response.Status}");
+                return [];
             }
 
-            var errorBuffer = new byte[response.Length];
-            await _stream.ReadAsync(errorBuffer, token);
-            throw new 
InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
+            var responseBuffer = new byte[response.Length];
+            await _stream.ReadAsync(responseBuffer, token);
+            return responseBuffer;
         }
-
-        if (response.Length == 0)
+        finally
         {
-            return [];
+            _semaphore.Release();
         }
-
-        var responseBuffer = new byte[response.Length];
-        await _stream.ReadAsync(responseBuffer, token);
-        return responseBuffer;
     }
 
     private static int CalculatePayloadBufferSize(int messageBufferSize)
diff --git a/foreign/csharp/README.md b/foreign/csharp/README.md
index f256d2ec..24eaef7f 100644
--- a/foreign/csharp/README.md
+++ b/foreign/csharp/README.md
@@ -405,6 +405,8 @@ Integration tests are located in 
`Iggy_SDK/Iggy_Sample_Producer/IntegrationTests
 Tests can be run against a dockerized Iggy server with TestContainers or local 
Iggy server.
 To run with a local Iggy server, the environment variable `IGGY_SERVER_HOST` 
needs to be set.
 
-## TODO
+## ROADMAP - TODO
 
-- Add support for `ASP.NET Core` Dependency Injection
+- [ ] Consumer/Publisher client - WIP
+- [ ] Error handling with status codes and descriptions
+- [ ] Add support for `ASP.NET Core` Dependency Injection


Reply via email to