This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6950c11 Make ready for 1.0.0 and fix seek problem (broker is sending close command before success) 6950c11 is described below commit 6950c11c6190ad7e460c2b879224f64b43b480cf Author: Daniel Blankensteiner <d...@vmail.dk> AuthorDate: Wed Mar 17 15:59:11 2021 +0100 Make ready for 1.0.0 and fix seek problem (broker is sending close command before success) --- .asf.yaml | 2 +- CHANGELOG.md | 4 ++-- README.md | 10 +++++++++- src/DotPulsar/DotPulsar.csproj | 4 ++-- src/DotPulsar/Extensions/ConsumerExtensions.cs | 2 +- src/DotPulsar/Internal/Abstractions/IRequest.cs | 2 ++ src/DotPulsar/Internal/RequestResponseHandler.cs | 9 +++++++-- src/DotPulsar/Internal/Requests/ConnectRequest.cs | 4 ++++ src/DotPulsar/Internal/Requests/SendRequest.cs | 4 ++++ src/DotPulsar/Internal/Requests/StandardRequest.cs | 18 ++++++++++++------ 10 files changed, 44 insertions(+), 15 deletions(-) diff --git a/.asf.yaml b/.asf.yaml index 319738d..d4007d6 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -18,7 +18,7 @@ # github: - description: "The official .NET/C# client library for Apache Pulsar" + description: "The official .NET client library for Apache Pulsar" homepage: https://pulsar.apache.org/ labels: - pulsar diff --git a/CHANGELOG.md b/CHANGELOG.md index f6246fa..b4b558b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,14 +4,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [1.0.0] - 2021-03-17 ### Added - A number of resilience, correctness, and performance improvements - The optional listener name can be set via the PulsarClientBuilder - *Experimental*: Added an extension method for IConsumer that will 'Process' and auto-acknowledge messages while creating an Activity (useful for doing tracing) -- Schemas with support for the following types +- Schema support for the following types - Boolean - Bytes (using byte[] and ReadOnlySequence\<byte\>) - String (UTF-8, UTF-16, and US-ASCII) diff --git a/README.md b/README.md index fe7afdc..a4a92ec 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ ![CI - Unit](https://github.com/apache/pulsar-dotpulsar/workflows/CI%20-%20Unit/badge.svg) -The official .NET/C# client library for [Apache Pulsar](https://pulsar.apache.org/). +The official .NET client library for [Apache Pulsar](https://pulsar.apache.org/). DotPulsar is written entirely in C# and implements Apache Pulsar's [binary protocol](https://pulsar.apache.org/docs/en/develop-binary-protocol/). @@ -65,6 +65,14 @@ For a more in-depth tour of the API, please visit the [Wiki](https://github.com/ - [X] [ZLIB message compression](https://github.com/apache/pulsar-dotpulsar/wiki/Compression) - [X] [ZSTD message compression](https://github.com/apache/pulsar-dotpulsar/wiki/Compression) - [X] [SNAPPY message compression](https://github.com/apache/pulsar-dotpulsar/wiki/Compression) +- [X] Schemas + - Boolean + - Bytes (using byte[] and ReadOnlySequence\<byte\>) + - String (UTF-8, UTF-16, and US-ASCII) + - INT8, INT16, INT32, and INT64 + - Float and Double + - Time (using TimeSpan) + - Timestamp and Date (using DateTime) ## Roadmap diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj index 3f0521a..88b5273 100644 --- a/src/DotPulsar/DotPulsar.csproj +++ b/src/DotPulsar/DotPulsar.csproj @@ -2,7 +2,7 @@ <PropertyGroup> <TargetFrameworks>netstandard2.0;netstandard2.1;netcoreapp3.1;net5.0</TargetFrameworks> - <Version>0.11.0</Version> + <Version>1.0.0</Version> <AssemblyVersion>$(Version)</AssemblyVersion> <FileVersion>$(Version)</FileVersion> <Authors>ApachePulsar,DanskeCommodities,dblank</Authors> @@ -13,7 +13,7 @@ <PackageIcon>PackageIcon.png</PackageIcon> <PackageLicenseExpression>Apache-2.0</PackageLicenseExpression> <PackageReleaseNotes>Please refer to CHANGELOG.md for details</PackageReleaseNotes> - <Description>The official .NET/C# client library for Apache Pulsar</Description> + <Description>The official .NET client library for Apache Pulsar</Description> <GenerateDocumentationFile>true</GenerateDocumentationFile> <PublishRepositoryUrl>true</PublishRepositoryUrl> <IncludeSymbols>true</IncludeSymbols> diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs index 13f2121..f9a78a0 100644 --- a/src/DotPulsar/Extensions/ConsumerExtensions.cs +++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs @@ -40,7 +40,7 @@ namespace DotPulsar.Extensions => await consumer.AcknowledgeCumulative(message.MessageId, cancellationToken).ConfigureAwait(false); /// <summary> - /// Process and auto-acknowledge a message. + /// Process and auto-acknowledge a message. This is experimental. /// </summary> public static async ValueTask Process<TMessage>( this IConsumer<TMessage> consumer, diff --git a/src/DotPulsar/Internal/Abstractions/IRequest.cs b/src/DotPulsar/Internal/Abstractions/IRequest.cs index 8ea7913..0841e26 100644 --- a/src/DotPulsar/Internal/Abstractions/IRequest.cs +++ b/src/DotPulsar/Internal/Abstractions/IRequest.cs @@ -14,11 +14,13 @@ namespace DotPulsar.Internal.Abstractions { + using DotPulsar.Internal.PulsarApi; using System; public interface IRequest : IEquatable<IRequest> { bool SenderIsProducer(ulong producerId); bool SenderIsConsumer(ulong consumerId); + bool IsCommandType(BaseCommand.Type commandType); } } diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs index 5cbd632..9680d4c 100644 --- a/src/DotPulsar/Internal/RequestResponseHandler.cs +++ b/src/DotPulsar/Internal/RequestResponseHandler.cs @@ -108,7 +108,7 @@ namespace DotPulsar.Internal public Task<BaseCommand> Outgoing(CommandSeek command) { command.RequestId = _requestId.FetchNext(); - return _requests.CreateTask(StandardRequest.WithConsumerId(command.RequestId, command.ConsumerId)); + return _requests.CreateTask(StandardRequest.WithConsumerId(command.RequestId, command.ConsumerId, BaseCommand.Type.Seek)); } public Task<BaseCommand> Outgoing(CommandGetLastMessageId command) @@ -131,7 +131,12 @@ namespace DotPulsar.Internal foreach (var request in requests) { if (request.SenderIsConsumer(command.ConsumerId)) - _requests.Cancel(request); + { + if (request.IsCommandType(BaseCommand.Type.Seek)) + _requests.SetResult(request, new BaseCommand { CommandType = BaseCommand.Type.Success }); + else + _requests.Cancel(request); + } } } diff --git a/src/DotPulsar/Internal/Requests/ConnectRequest.cs b/src/DotPulsar/Internal/Requests/ConnectRequest.cs index 5ab0e20..18f977c 100644 --- a/src/DotPulsar/Internal/Requests/ConnectRequest.cs +++ b/src/DotPulsar/Internal/Requests/ConnectRequest.cs @@ -15,6 +15,7 @@ namespace DotPulsar.Internal.Requests { using DotPulsar.Internal.Abstractions; + using DotPulsar.Internal.PulsarApi; using System.Diagnostics.CodeAnalysis; public struct ConnectRequest : IRequest @@ -25,6 +26,9 @@ namespace DotPulsar.Internal.Requests public bool SenderIsProducer(ulong producerId) => false; + public bool IsCommandType(BaseCommand.Type commandType) + => commandType == BaseCommand.Type.Connect; + #if NETSTANDARD2_0 public bool Equals(IRequest other) #else diff --git a/src/DotPulsar/Internal/Requests/SendRequest.cs b/src/DotPulsar/Internal/Requests/SendRequest.cs index 36498b6..6023a88 100644 --- a/src/DotPulsar/Internal/Requests/SendRequest.cs +++ b/src/DotPulsar/Internal/Requests/SendRequest.cs @@ -15,6 +15,7 @@ namespace DotPulsar.Internal.Requests { using DotPulsar.Internal.Abstractions; + using DotPulsar.Internal.PulsarApi; using System; using System.Diagnostics.CodeAnalysis; @@ -35,6 +36,9 @@ namespace DotPulsar.Internal.Requests public bool SenderIsProducer(ulong producerId) => _producerId == producerId; + public bool IsCommandType(BaseCommand.Type commandType) + => commandType == BaseCommand.Type.Send; + #if NETSTANDARD2_0 public bool Equals(IRequest other) #else diff --git a/src/DotPulsar/Internal/Requests/StandardRequest.cs b/src/DotPulsar/Internal/Requests/StandardRequest.cs index 666b227..69191a2 100644 --- a/src/DotPulsar/Internal/Requests/StandardRequest.cs +++ b/src/DotPulsar/Internal/Requests/StandardRequest.cs @@ -15,6 +15,7 @@ namespace DotPulsar.Internal.Requests { using DotPulsar.Internal.Abstractions; + using DotPulsar.Internal.PulsarApi; using System; using System.Diagnostics.CodeAnalysis; @@ -23,22 +24,24 @@ namespace DotPulsar.Internal.Requests private readonly ulong _requestId; private readonly ulong? _consumerId; private readonly ulong? _producerId; + private readonly BaseCommand.Type? _commandType; - private StandardRequest(ulong requestId, ulong? consumerId, ulong? producerId) + private StandardRequest(ulong requestId, ulong? consumerId, ulong? producerId, BaseCommand.Type? commandType) { _requestId = requestId; _consumerId = consumerId; _producerId = producerId; + _commandType = commandType; } public static StandardRequest WithRequestId(ulong requestId) - => new(requestId, null, null); + => new(requestId, null, null, null); - public static StandardRequest WithConsumerId(ulong requestId, ulong consumerId) - => new(requestId, consumerId, null); + public static StandardRequest WithConsumerId(ulong requestId, ulong consumerId, BaseCommand.Type? commandType = null) + => new(requestId, consumerId, null, commandType); - public static StandardRequest WithProducerId(ulong requestId, ulong producerId) - => new (requestId, null, producerId); + public static StandardRequest WithProducerId(ulong requestId, ulong producerId, BaseCommand.Type? commandType = null) + => new (requestId, null, producerId, commandType); public bool SenderIsConsumer(ulong consumerId) => _consumerId.HasValue && _consumerId.Value == consumerId; @@ -46,6 +49,9 @@ namespace DotPulsar.Internal.Requests public bool SenderIsProducer(ulong producerId) => _producerId.HasValue && _producerId.Value == producerId; + public bool IsCommandType(BaseCommand.Type commandType) + => _commandType.HasValue && _commandType.Value == commandType; + #if NETSTANDARD2_0 public bool Equals(IRequest other) #else