This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push: new 1c0ed27 Minor cleanup and updated NuGet package 1c0ed27 is described below commit 1c0ed27a8d77633f6396b92759161b3be20844ca Author: Daniel Blankensteiner <d...@vmail.dk> AuthorDate: Mon Jun 21 22:12:55 2021 +0200 Minor cleanup and updated NuGet package --- samples/Producing/Program.cs | 2 +- src/DotPulsar/DotPulsar.csproj | 2 +- src/DotPulsar/Internal/Extensions/CommandExtensions.cs | 2 +- .../Internal/Extensions/MessageMetadataExtensions.cs | 11 ++++++++++- src/DotPulsar/Internal/ProducerBuilder.cs | 3 ++- src/DotPulsar/Internal/ProducerProcess.cs | 2 -- src/DotPulsar/Internal/PulsarClientFactory.cs | 4 +--- src/DotPulsar/ProducerOptions.cs | 2 +- src/DotPulsar/PulsarClient.cs | 2 -- src/DotPulsar/RoundRobinPartitionRouter.cs | 10 ++++------ src/DotPulsar/SinglePartitionRouter.cs | 11 ++++------- 11 files changed, 25 insertions(+), 26 deletions(-) diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs index 9c2ef0e..42a706b 100644 --- a/samples/Producing/Program.cs +++ b/samples/Producing/Program.cs @@ -71,7 +71,7 @@ namespace Producing { ProducerState.Connected => "is connected", ProducerState.Disconnected => "is disconnected", - ProducerState.PartiallyConnected => "has partially connected", + ProducerState.PartiallyConnected => "is partially connected", ProducerState.Closed => "has closed", ProducerState.Faulted => "has faulted", _ => $"has an unknown state '{stateChanged.ProducerState}'" diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj index 9cce4c3..0613e86 100644 --- a/src/DotPulsar/DotPulsar.csproj +++ b/src/DotPulsar/DotPulsar.csproj @@ -23,7 +23,7 @@ <ItemGroup> <PackageReference Include="HashDepot" Version="2.0.3" /> - <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.6" /> + <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.7" /> <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" /> <PackageReference Include="protobuf-net" Version="3.0.101" /> <PackageReference Include="System.IO.Pipelines" Version="5.0.1" /> diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs index ffa9295..9cea7cc 100644 --- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs +++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs @@ -192,7 +192,7 @@ namespace DotPulsar.Internal.Extensions }; public static BaseCommand AsBaseCommand(this CommandPartitionedTopicMetadata command) - => new BaseCommand + => new() { CommandType = BaseCommand.Type.PartitionedMetadata, PartitionMetadata = command }; diff --git a/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs b/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs index 3e558fc..f2bac5f 100644 --- a/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs +++ b/src/DotPulsar/Internal/Extensions/MessageMetadataExtensions.cs @@ -15,6 +15,7 @@ namespace DotPulsar.Internal.Extensions { using System; + using System.Text; using Metadata = PulsarApi.MessageMetadata; public static class MessageMetadataExtensions @@ -47,7 +48,15 @@ namespace DotPulsar.Internal.Extensions // Key public static byte[]? GetKeyAsBytes(this Metadata metadata) - => metadata.PartitionKeyB64Encoded ? Convert.FromBase64String(metadata.PartitionKey) : null; + { + if (metadata.PartitionKey is null) + return null; + + if (metadata.PartitionKeyB64Encoded) + return Convert.FromBase64String(metadata.PartitionKey); + + return Encoding.UTF8.GetBytes(metadata.PartitionKey); + } public static void SetKey(this Metadata metadata, string? key) { diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs index 95a7bdc..7470fe9 100644 --- a/src/DotPulsar/Internal/ProducerBuilder.cs +++ b/src/DotPulsar/Internal/ProducerBuilder.cs @@ -85,7 +85,8 @@ namespace DotPulsar.Internal StateChangedHandler = _stateChangedHandler }; - if (_messageRouter != null) options.MessageRouter = _messageRouter; + if (_messageRouter is not null) + options.MessageRouter = _messageRouter; return _pulsarClient.CreateProducer<TMessage>(options); } diff --git a/src/DotPulsar/Internal/ProducerProcess.cs b/src/DotPulsar/Internal/ProducerProcess.cs index e7230fe..36834df 100644 --- a/src/DotPulsar/Internal/ProducerProcess.cs +++ b/src/DotPulsar/Internal/ProducerProcess.cs @@ -79,8 +79,6 @@ namespace DotPulsar.Internal case ProducerState.Faulted: _stateManager.SetState(ProducerState.Faulted); break; - case ProducerState.PartiallyConnected: break; - default: throw new ArgumentOutOfRangeException(); } break; diff --git a/src/DotPulsar/Internal/PulsarClientFactory.cs b/src/DotPulsar/Internal/PulsarClientFactory.cs index fc58aa9..e703f73 100644 --- a/src/DotPulsar/Internal/PulsarClientFactory.cs +++ b/src/DotPulsar/Internal/PulsarClientFactory.cs @@ -21,8 +21,6 @@ namespace DotPulsar.Internal public sealed class PulsarClientFactory { public static PulsarClient CreatePulsarClient(IConnectionPool connectionPool, ProcessManager processManager, IHandleException exceptionHandler, Uri serviceUrl) - { - return new PulsarClient(connectionPool, processManager, exceptionHandler, serviceUrl); - } + => new(connectionPool, processManager, exceptionHandler, serviceUrl); } } diff --git a/src/DotPulsar/ProducerOptions.cs b/src/DotPulsar/ProducerOptions.cs index 3e8cd0e..1444198 100644 --- a/src/DotPulsar/ProducerOptions.cs +++ b/src/DotPulsar/ProducerOptions.cs @@ -74,7 +74,7 @@ namespace DotPulsar public string Topic { get; set; } /// <summary> - /// Set the message router. The default router is Round Robin partition router. + /// Set the message router. The default router is the Round Robin partition router. /// </summary> public IMessageRouter MessageRouter { get; set; } } diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs index 5415485..453ba26 100644 --- a/src/DotPulsar/PulsarClient.cs +++ b/src/DotPulsar/PulsarClient.cs @@ -105,9 +105,7 @@ namespace DotPulsar ICompressorFactory? compressorFactory = null; if (partitionIndex.HasValue) - { topic = $"{topic}-partition-{partitionIndex}"; - } if (options.CompressionType != CompressionType.None) { diff --git a/src/DotPulsar/RoundRobinPartitionRouter.cs b/src/DotPulsar/RoundRobinPartitionRouter.cs index 05595d6..a7da420 100644 --- a/src/DotPulsar/RoundRobinPartitionRouter.cs +++ b/src/DotPulsar/RoundRobinPartitionRouter.cs @@ -16,7 +16,6 @@ namespace DotPulsar { using Abstractions; using HashDepot; - using System.Text; using System.Threading; /// <summary> @@ -31,14 +30,13 @@ namespace DotPulsar private int _partitionIndex = -1; /// <summary> - /// Choose a partition in round robin routig mode + /// Choose a partition in round robin routing mode /// </summary> public int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount) { - if (messageMetadata != null && !string.IsNullOrEmpty(messageMetadata.Key)) - { - return (int) MurmurHash3.Hash32(Encoding.UTF8.GetBytes(messageMetadata.Key ?? string.Empty), 0) % partitionsCount; - } + var keyBytes = messageMetadata?.KeyBytes; + if (keyBytes is not null) + return (int) MurmurHash3.Hash32(keyBytes, 0) % partitionsCount; return Interlocked.Increment(ref _partitionIndex) % partitionsCount; } diff --git a/src/DotPulsar/SinglePartitionRouter.cs b/src/DotPulsar/SinglePartitionRouter.cs index 3a2b563..69a82f5 100644 --- a/src/DotPulsar/SinglePartitionRouter.cs +++ b/src/DotPulsar/SinglePartitionRouter.cs @@ -17,7 +17,6 @@ namespace DotPulsar using Abstractions; using HashDepot; using System; - using System.Text; /// <summary> /// If no key is provided, the producer will randomly pick one single partition and publish all the messages @@ -38,13 +37,11 @@ namespace DotPulsar /// </summary> public int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount) { - if (messageMetadata != null && !string.IsNullOrEmpty(messageMetadata.Key)) - { - return (int) MurmurHash3.Hash32(Encoding.UTF8.GetBytes(messageMetadata.Key ?? string.Empty), 0) % partitionsCount; - } - + var keyBytes = messageMetadata?.KeyBytes; + if (keyBytes is not null) + return (int) MurmurHash3.Hash32(keyBytes, 0) % partitionsCount; + _partitionIndex ??= new Random().Next(0, partitionsCount); - return _partitionIndex.Value; } }