This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 5c73cc6d refactor(csharp): use semaphore in tcp implementation (#2179)
5c73cc6d is described below
commit 5c73cc6dcf540bbb6b2a57c1c4d4b1cdfd5d6a80
Author: Ćukasz Zborek <[email protected]>
AuthorDate: Sat Sep 27 19:02:53 2025 +0200
refactor(csharp): use semaphore in tcp implementation (#2179)
Refactor the current TCP implementation to use a semaphore.
This will ensure that only one thread will use one client for TCP
communication at the same time.
---
foreign/csharp/DEPENDENCIES.md | 6 +-
foreign/csharp/Directory.Packages.props | 6 +-
.../Iggy_SDK.Tests.Integration/OffsetTests.cs | 15 +
.../csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs | 4 +-
.../IggyClient/Implementations/TcpMessageStream.cs | 414 +++++----------------
foreign/csharp/README.md | 6 +-
6 files changed, 120 insertions(+), 331 deletions(-)
diff --git a/foreign/csharp/DEPENDENCIES.md b/foreign/csharp/DEPENDENCIES.md
index c0197206..f39b6a96 100644
--- a/foreign/csharp/DEPENDENCIES.md
+++ b/foreign/csharp/DEPENDENCIES.md
@@ -9,10 +9,10 @@ Microsoft.NET.Test.Sdk: "17.14.1", "MIT",
Microsoft.Testing.Extensions.CodeCoverage: "17.14.2", "MIT",
Microsoft.Testing.Extensions.TrxReport: "1.8.2", "MIT",
Moq: "4.20.72", "BSD-3-Clause",
-Reqnroll.xUnit: "2.4.1", "BSD-3-Clause",
+Reqnroll.xUnit: "3.0.3", "BSD-3-Clause",
Shouldly: "4.3.0", "BSD-3-Clause",
System.IO.Hashing: "8.0.0", "MIT",
-Testcontainers: "4.6.0", "MIT",
-TUnit: "0.57.65", "MIT",
+Testcontainers: "4.7.0", "MIT",
+TUnit: "0.58.3", "MIT",
xunit: "2.9.3", "Apache-2.0",
xunit.runner.visualstudio: "3.1.3", "Apache-2.0",
diff --git a/foreign/csharp/Directory.Packages.props
b/foreign/csharp/Directory.Packages.props
index 67d869cf..946b2959 100644
--- a/foreign/csharp/Directory.Packages.props
+++ b/foreign/csharp/Directory.Packages.props
@@ -11,13 +11,13 @@
<PackageVersion Include="Microsoft.Extensions.Logging.Console"
Version="8.0.1" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
<PackageVersion Include="Microsoft.Testing.Extensions.CodeCoverage"
Version="17.14.2" />
- <PackageVersion Include="Microsoft.Testing.Extensions.TrxReport"
Version="1.8.2" />
+ <PackageVersion Include="Microsoft.Testing.Extensions.TrxReport"
Version="1.8.4" />
<PackageVersion Include="Moq" Version="4.20.72" />
- <PackageVersion Include="Reqnroll.xUnit" Version="2.4.1" />
+ <PackageVersion Include="Reqnroll.xUnit" Version="3.0.3" />
<PackageVersion Include="Shouldly" Version="4.3.0" />
<PackageVersion Include="System.IO.Hashing" Version="8.0.0" />
<PackageVersion Include="Testcontainers" Version="4.7.0" />
- <PackageVersion Include="TUnit" Version="0.57.65" />
+ <PackageVersion Include="TUnit" Version="0.58.3" />
<PackageVersion Include="xunit" Version="2.9.3" />
<PackageVersion Include="xunit.runner.visualstudio" Version="3.1.4">
<PrivateAssets>all</PrivateAssets>
diff --git a/foreign/csharp/Iggy_SDK.Tests.Integration/OffsetTests.cs
b/foreign/csharp/Iggy_SDK.Tests.Integration/OffsetTests.cs
index 70136132..9a22aed7 100644
--- a/foreign/csharp/Iggy_SDK.Tests.Integration/OffsetTests.cs
+++ b/foreign/csharp/Iggy_SDK.Tests.Integration/OffsetTests.cs
@@ -82,4 +82,19 @@ public class OffsetTests
offset.PartitionId.ShouldBe(1);
offset.CurrentOffset.ShouldBe(3u);
}
+
+ [Test]
+
[DependsOn(nameof(StoreOffset_ConsumerGroup_Should_StoreOffset_Successfully))]
+
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
+ public async Task
GetOffset_ConsumerGroup_ByName_Should_GetOffset_Successfully(Protocol protocol)
+ {
+ var offset = await
Fixture.Clients[protocol].GetOffsetAsync(Consumer.Group("test_consumer_group"),
+ Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)),
Identifier.String(Fixture.TopicRequest.Name),
+ 1);
+
+ offset.ShouldNotBeNull();
+ offset.StoredOffset.ShouldBe(SetOffset);
+ offset.PartitionId.ShouldBe(1);
+ offset.CurrentOffset.ShouldBe(3u);
+ }
}
diff --git a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
index 1b5b3b8a..631fc523 100644
--- a/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
+++ b/foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs
@@ -749,7 +749,7 @@ internal static class TcpContracts
bytes.WriteBytesFromIdentifier(consumer.Id, 1);
var position = 1 + consumer.Id.Length + 2;
bytes.WriteBytesFromStreamAndTopicIdentifiers(streamId, topicId,
position);
- position = 7 + 2 + streamId.Length + 2 + topicId.Length;
+ position += + 2 + streamId.Length + 2 + topicId.Length;
BinaryPrimitives.WriteUInt32LittleEndian(bytes[position..(position +
4)], partitionId ?? 0);
return bytes.ToArray();
}
@@ -809,4 +809,4 @@ internal static class TcpContracts
BinaryPrimitives.WriteUInt32LittleEndian(bytes[position..(position +
4)], partitionId ?? 0);
return bytes.ToArray();
}
-}
\ No newline at end of file
+}
diff --git
a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs
b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs
index 3b30d652..4294409d 100644
--- a/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs
+++ b/foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-using System.Buffers;
using System.Buffers.Binary;
using System.IO.Hashing;
using System.Runtime.CompilerServices;
@@ -44,6 +43,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
private readonly ILogger<TcpMessageStream> _logger;
private readonly IMessageInvoker? _messageInvoker;
private readonly MessagePollingSettings _messagePollingSettings;
+ private readonly SemaphoreSlim _semaphore;
private readonly IConnectionStream _stream;
internal TcpMessageStream(IConnectionStream stream,
Channel<MessageSendRequest>? channel,
@@ -55,12 +55,14 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
_messagePollingSettings = messagePollingSettings;
_messageInvoker = messageInvoker;
_logger = loggerFactory.CreateLogger<TcpMessageStream>();
+ _semaphore = new SemaphoreSlim(1, 1);
}
public void Dispose()
{
_stream.Close();
_stream.Dispose();
+ _semaphore.Dispose();
}
public async Task<StreamResponse?> CreateStreamAsync(string name, uint?
streamId, CancellationToken token = default)
@@ -69,10 +71,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.CREATE_STREAM_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -88,10 +87,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.GET_STREAM_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -107,10 +103,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.GET_STREAMS_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -126,10 +119,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.UPDATE_STREAM_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task PurgeStreamAsync(Identifier streamId, CancellationToken
token = default)
@@ -138,10 +128,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.PURGE_STREAM_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task DeleteStreamAsync(Identifier streamId, CancellationToken
token = default)
@@ -150,10 +137,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.DELETE_STREAM_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task<IReadOnlyList<TopicResponse>> GetTopicsAsync(Identifier
streamId,
@@ -163,10 +147,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.GET_TOPICS_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -183,10 +164,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.GET_TOPIC_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -207,10 +185,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.CREATE_TOPIC_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -230,10 +205,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.UPDATE_TOPIC_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task DeleteTopicAsync(Identifier streamId, Identifier
topicId, CancellationToken token = default)
@@ -242,10 +214,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.DELETE_TOPIC_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task PurgeTopicAsync(Identifier streamId, Identifier topicId,
CancellationToken token = default)
@@ -254,10 +223,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.PURGE_TOPIC_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task SendMessagesAsync(MessageSendRequest request,
@@ -272,7 +238,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
//TODO - explore making fields of Message class mutable, so there is
no need to create em from scratch
if (encryptor is not null)
{
- for (var i = 0; i < request.Messages.Count ||
token.IsCancellationRequested; i++)
+ for (var i = 0; i < request.Messages.Count; i++)
{
request.Messages[i] = request.Messages[i] with { Payload =
encryptor(request.Messages[i].Payload) };
}
@@ -345,62 +311,30 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
CancellationToken token = default)
{
var message = TcpContracts.FlushUnsavedBuffer(streamId, topicId,
partitionId, fsync);
- ;
+
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.FLUSH_UNSAVED_BUFFER_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task<PolledMessages<TMessage>>
PollMessagesAsync<TMessage>(MessageFetchRequest request,
Func<byte[], TMessage> serializer, Func<byte[], byte[]>? decryptor =
null, CancellationToken token = default)
{
- await SendFetchMessagesRequestPayload(request.Consumer,
request.StreamId, request.TopicId,
- request.PollingStrategy,
- request.Count, request.AutoCommit, request.PartitionId, token);
- IMemoryOwner<byte> buffer =
MemoryPool<byte>.Shared.Rent(BufferSizes.EXPECTED_RESPONSE_SIZE);
- try
- {
- await
_stream.ReadAsync(buffer.Memory[..BufferSizes.EXPECTED_RESPONSE_SIZE], token);
- var response =
TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer.Memory.Span);
- if (response.Status != 0)
- {
- if (response.Length == 0)
- {
- throw new InvalidResponseException($"Invalid response
status code: {response.Status}");
- }
-
- var errorBuffer = new byte[response.Length];
- await _stream.ReadAsync(errorBuffer, token);
- throw new
InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
- }
+ var messageBufferSize = CalculateMessageBufferSize(request.StreamId,
request.TopicId, request.Consumer);
+ var payloadBufferSize = CalculatePayloadBufferSize(messageBufferSize);
+ var message = new byte[messageBufferSize];
+ var payload = new byte[payloadBufferSize];
- if (response.Length <= 1)
- {
- return PolledMessages<TMessage>.Empty;
- }
+ TcpContracts.GetMessages(message.AsSpan()[..messageBufferSize],
request.Consumer, request.StreamId,
+ request.TopicId,
+ request.PollingStrategy, request.Count, request.AutoCommit,
request.PartitionId);
+ TcpMessageStreamHelpers.CreatePayload(payload,
message.AsSpan()[..messageBufferSize],
+ CommandCodes.POLL_MESSAGES_CODE);
- IMemoryOwner<byte> responseBuffer =
MemoryPool<byte>.Shared.Rent(response.Length);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
- try
- {
- await
_stream.ReadAsync(responseBuffer.Memory[..response.Length], token);
- PolledMessages<TMessage> result = BinaryMapper.MapMessages(
- responseBuffer.Memory.Span[..response.Length], serializer,
decryptor);
- return result;
- }
- finally
- {
- responseBuffer.Dispose();
- }
- }
- finally
- {
- buffer.Dispose();
- }
+ return BinaryMapper.MapMessages(responseBuffer, serializer, decryptor);
}
public async IAsyncEnumerable<MessageResponse<TMessage>>
PollMessagesAsync<TMessage>(PollMessagesRequest request,
@@ -461,50 +395,19 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
Func<byte[], byte[]>? decryptor = null,
CancellationToken token = default)
{
- await SendFetchMessagesRequestPayload(request.Consumer,
request.StreamId, request.TopicId,
- request.PollingStrategy,
- request.Count, request.AutoCommit, request.PartitionId, token);
-
- var buffer =
ArrayPool<byte>.Shared.Rent(BufferSizes.EXPECTED_RESPONSE_SIZE);
- try
- {
- await
_stream.ReadAsync(buffer.AsMemory()[..BufferSizes.EXPECTED_RESPONSE_SIZE],
token);
-
- var response =
TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
- if (response.Status != 0)
- {
- if (response.Length == 0)
- {
- throw new InvalidResponseException($"Invalid response
status code: {response.Status}");
- }
-
- var errorBuffer = new byte[response.Length];
- await _stream.ReadAsync(errorBuffer, token);
- throw new
InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
- }
+ var messageBufferSize = CalculateMessageBufferSize(request.StreamId,
request.TopicId, request.Consumer);
+ var payloadBufferSize = CalculatePayloadBufferSize(messageBufferSize);
+ var message = new byte[messageBufferSize];
+ var payload = new byte[payloadBufferSize];
- if (response.Length <= 1)
- {
- return PolledMessages.Empty;
- }
+ TcpContracts.GetMessages(message.AsSpan()[..messageBufferSize],
request.Consumer, request.StreamId,
+ request.TopicId, request.PollingStrategy, request.Count,
request.AutoCommit, request.PartitionId);
+ TcpMessageStreamHelpers.CreatePayload(payload,
message.AsSpan()[..messageBufferSize],
+ CommandCodes.POLL_MESSAGES_CODE);
- var responseBuffer = ArrayPool<byte>.Shared.Rent(response.Length);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
- try
- {
- await
_stream.ReadAsync(responseBuffer.AsMemory()[..response.Length], token);
- var result =
BinaryMapper.MapMessages(responseBuffer.AsSpan()[..response.Length], decryptor);
- return result;
- }
- finally
- {
- ArrayPool<byte>.Shared.Return(responseBuffer);
- }
- }
- finally
- {
- ArrayPool<byte>.Shared.Return(buffer);
- }
+ return BinaryMapper.MapMessages(responseBuffer, decryptor);
}
public async Task StoreOffsetAsync(Consumer consumer, Identifier streamId,
Identifier topicId, ulong offset,
@@ -514,10 +417,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.STORE_CONSUMER_OFFSET_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task<OffsetResponse?> GetOffsetAsync(Consumer consumer,
Identifier streamId, Identifier topicId,
@@ -527,10 +427,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.GET_CONSUMER_OFFSET_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -547,10 +444,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.DELETE_CONSUMER_OFFSET_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task<IReadOnlyList<ConsumerGroupResponse>>
GetConsumerGroupsAsync(Identifier streamId,
@@ -561,10 +455,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.GET_CONSUMER_GROUPS_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -581,10 +472,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.GET_CONSUMER_GROUP_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -601,10 +489,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.CREATE_CONSUMER_GROUP_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -621,10 +506,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.DELETE_CONSUMER_GROUP_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task JoinConsumerGroupAsync(Identifier streamId, Identifier
topicId, Identifier groupId,
@@ -634,10 +516,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.JOIN_CONSUMER_GROUP_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task LeaveConsumerGroupAsync(Identifier streamId, Identifier
topicId, Identifier groupId,
@@ -647,10 +526,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.LEAVE_CONSUMER_GROUP_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task DeletePartitionsAsync(Identifier streamId, Identifier
topicId, uint partitionsCount,
@@ -660,10 +536,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.DELETE_PARTITIONS_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task CreatePartitionsAsync(Identifier streamId, Identifier
topicId, uint partitionsCount,
@@ -673,10 +546,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.CREATE_PARTITIONS_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task<ClientResponse?> GetMeAsync(CancellationToken token =
default)
@@ -685,10 +555,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.GET_ME_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -704,10 +571,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.GET_STATS_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -723,10 +587,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.PING_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task<IReadOnlyList<ClientResponse>>
GetClientsAsync(CancellationToken token = default)
@@ -735,10 +596,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.GET_CLIENTS_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -754,10 +612,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.GET_CLIENT_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -773,10 +628,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.GET_USER_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -792,10 +644,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.GET_USERS_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -812,10 +661,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.CREATE_USER_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -831,10 +677,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.DELETE_USER_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task UpdateUser(Identifier userId, string? userName = null,
UserStatus? status = null,
@@ -844,10 +687,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.UPDATE_USER_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task UpdatePermissions(Identifier userId, Permissions?
permissions = null,
@@ -857,10 +697,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.UPDATE_PERMISSIONS_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task ChangePassword(Identifier userId, string
currentPassword, string newPassword,
@@ -870,10 +707,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.CHANGE_PASSWORD_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task<AuthResponse?> LoginUser(string userName, string
password, CancellationToken token = default)
@@ -882,10 +716,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.LOGIN_USER_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length <= 0)
{
@@ -894,7 +725,6 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var userId =
BinaryPrimitives.ReadInt32LittleEndian(responseBuffer.AsSpan()[..responseBuffer.Length]);
- //TODO: Figure out how to solve this workaround about default of
TokenInfo
return new AuthResponse(userId, null);
}
@@ -904,10 +734,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.LOGOUT_USER_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task<IReadOnlyList<PersonalAccessTokenResponse>>
GetPersonalAccessTokensAsync(
@@ -917,10 +744,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.GET_PERSONAL_ACCESS_TOKENS_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -937,10 +761,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.CREATE_PERSONAL_ACCESS_TOKEN_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- var responseBuffer = await GetMessageAsync(token);
+ var responseBuffer = await SendWithResponseAsync(payload, token);
if (responseBuffer.Length == 0)
{
@@ -956,10 +777,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.DELETE_PERSONAL_ACCESS_TOKEN_CODE);
- await _stream.SendAsync(payload, token);
- await _stream.FlushAsync(token);
-
- await CheckResponseAsync(token);
+ await SendWithResponseAsync(payload, token);
}
public async Task<AuthResponse?> LoginWithPersonalAccessToken(string
token, CancellationToken ct = default)
@@ -968,10 +786,7 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
var payload = new byte[4 + BufferSizes.INITIAL_BYTES_LENGTH +
message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message,
CommandCodes.LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE);
- await _stream.SendAsync(payload, ct);
- await _stream.FlushAsync(ct);
-
- var responseBuffer = await GetMessageAsync(ct);
+ var responseBuffer = await SendWithResponseAsync(payload, ct);
if (responseBuffer.Length <= 1)
{
@@ -1019,93 +834,50 @@ public sealed class TcpMessageStream : IIggyClient,
IDisposable
writer.Complete();
}
- private async Task SendFetchMessagesRequestPayload(Consumer consumer,
Identifier streamId, Identifier topicId,
- PollingStrategy pollingStrategy,
- int count, bool autoCommit, uint? partitionId, CancellationToken token)
+ private async Task<byte[]> SendWithResponseAsync(byte[] payload,
CancellationToken token = default)
{
- var messageBufferSize = CalculateMessageBufferSize(streamId, topicId,
consumer);
- var payloadBufferSize = CalculatePayloadBufferSize(messageBufferSize);
- var message = ArrayPool<byte>.Shared.Rent(messageBufferSize);
- var payload = ArrayPool<byte>.Shared.Rent(payloadBufferSize);
-
try
{
- TcpContracts.GetMessages(message.AsSpan()[..messageBufferSize],
consumer, streamId, topicId,
- pollingStrategy, count, autoCommit, partitionId);
- TcpMessageStreamHelpers.CreatePayload(payload,
message.AsSpan()[..messageBufferSize],
- CommandCodes.POLL_MESSAGES_CODE);
-
- await _stream.SendAsync(payload.AsMemory()[..payloadBufferSize],
token);
- }
- finally
- {
- ArrayPool<byte>.Shared.Return(message);
- ArrayPool<byte>.Shared.Return(payload);
- }
- }
+ await _semaphore.WaitAsync(token);
- private async Task CheckResponseAsync(CancellationToken token = default)
- {
- var buffer = new byte[BufferSizes.EXPECTED_RESPONSE_SIZE];
- var readBytes = await _stream.ReadAsync(buffer, token);
+ await _stream.SendAsync(payload, token);
+ await _stream.FlushAsync(token);
- if (readBytes == 0)
- {
- throw new InvalidResponseException("Received empty response from
server or connection was closed");
- }
-
- var response =
TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
+ var buffer = new byte[BufferSizes.EXPECTED_RESPONSE_SIZE];
+ var readBytes = await _stream.ReadAsync(buffer, token);
- if (response.Status != 0)
- {
- if (response.Length == 0)
+ if (readBytes == 0)
{
- throw new InvalidResponseException($"Invalid response status
code: {response.Status}");
+ throw new InvalidResponseException("Received empty response
from server or connection was closed");
}
- var errorBuffer = new byte[response.Length];
- await _stream.ReadAsync(errorBuffer, token);
- throw new
InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
- }
-
- if (response.Length != 0)
- {
- throw new InvalidResponseException("Expected response length to be
0, but got " + response.Length);
- }
- }
-
- private async Task<byte[]> GetMessageAsync(CancellationToken token =
default)
- {
- var buffer = new byte[BufferSizes.EXPECTED_RESPONSE_SIZE];
- var readBytes = await _stream.ReadAsync(buffer, token);
+ var response =
TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
- if (readBytes == 0)
- {
- throw new InvalidResponseException("Received empty response from
server or connection was closed");
- }
+ if (response.Status != 0)
+ {
+ if (response.Length == 0)
+ {
+ throw new InvalidResponseException($"Invalid response
status code: {response.Status}");
+ }
- var response =
TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
+ var errorBuffer = new byte[response.Length];
+ await _stream.ReadAsync(errorBuffer, token);
+ throw new
InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
+ }
- if (response.Status != 0)
- {
if (response.Length == 0)
{
- throw new InvalidResponseException($"Invalid response status
code: {response.Status}");
+ return [];
}
- var errorBuffer = new byte[response.Length];
- await _stream.ReadAsync(errorBuffer, token);
- throw new
InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
+ var responseBuffer = new byte[response.Length];
+ await _stream.ReadAsync(responseBuffer, token);
+ return responseBuffer;
}
-
- if (response.Length == 0)
+ finally
{
- return [];
+ _semaphore.Release();
}
-
- var responseBuffer = new byte[response.Length];
- await _stream.ReadAsync(responseBuffer, token);
- return responseBuffer;
}
private static int CalculatePayloadBufferSize(int messageBufferSize)
diff --git a/foreign/csharp/README.md b/foreign/csharp/README.md
index f256d2ec..24eaef7f 100644
--- a/foreign/csharp/README.md
+++ b/foreign/csharp/README.md
@@ -405,6 +405,8 @@ Integration tests are located in
`Iggy_SDK/Iggy_Sample_Producer/IntegrationTests
Tests can be run against a dockerized Iggy server with TestContainers or local
Iggy server.
To run with a local Iggy server, the environment variable `IGGY_SERVER_HOST`
needs to be set.
-## TODO
+## ROADMAP - TODO
-- Add support for `ASP.NET Core` Dependency Injection
+- [ ] Consumer/Publisher client - WIP
+- [ ] Error handling with status codes and descriptions
+- [ ] Add support for `ASP.NET Core` Dependency Injection