This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 615090e Changed the Producing sample from a console app to a worker
service
615090e is described below
commit 615090e9f74a70d9535a1d3a6f151d364c5b3f54
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Feb 5 13:25:37 2025 +0100
Changed the Producing sample from a console app to a worker service
---
DotPulsar.sln | 7 ++
.../Extensions.csproj} | 10 +--
.../Extensions/HostApplicationBuilderExtensions.cs | 33 +++++++++
.../{Processing => Extensions}/LoggerExtensions.cs | 78 +++++++++++++++++-----
samples/Processing/Processing.csproj | 1 +
samples/Processing/Program.cs | 7 +-
samples/Processing/Worker.cs | 3 +-
samples/Producing/Producing.csproj | 10 ++-
samples/Producing/Program.cs | 57 +++-------------
samples/Producing/Properties/launchSettings.json | 11 +++
samples/Producing/SendWorker.cs | 50 ++++++++++++++
samples/Producing/appsettings.Development.json | 8 +++
samples/Producing/appsettings.json | 8 +++
13 files changed, 208 insertions(+), 75 deletions(-)
diff --git a/DotPulsar.sln b/DotPulsar.sln
index ed8376d..032bfca 100644
--- a/DotPulsar.sln
+++ b/DotPulsar.sln
@@ -32,6 +32,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") =
"Benchmarks", "Benchmarks",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SendChannel",
"samples\SendChannel\SendChannel.csproj",
"{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Extensions",
"samples\Extensions\Extensions.csproj", "{F211960B-F2C6-4878-B1AD-72D63CDB3B7A}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -70,6 +72,10 @@ Global
{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.Build.0 =
Debug|Any CPU
{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any
CPU.ActiveCfg = Release|Any CPU
{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any CPU.Build.0
= Release|Any CPU
+ {F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Debug|Any CPU.ActiveCfg
= Debug|Any CPU
+ {F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Debug|Any CPU.Build.0 =
Debug|Any CPU
+ {F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Release|Any
CPU.ActiveCfg = Release|Any CPU
+ {F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Release|Any CPU.Build.0
= Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -82,6 +88,7 @@ Global
{CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E} =
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
{040F8253-074D-4977-BDB1-0D9798B52CE2} =
{2C57AF4B-0D23-42D7-86FE-80277FD52875}
{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2} =
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
+ {F211960B-F2C6-4878-B1AD-72D63CDB3B7A} =
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {88355922-E70A-4B73-B7F8-ABF8F2B59789}
diff --git a/samples/Processing/Processing.csproj
b/samples/Extensions/Extensions.csproj
similarity index 60%
copy from samples/Processing/Processing.csproj
copy to samples/Extensions/Extensions.csproj
index 1365045..54ab226 100644
--- a/samples/Processing/Processing.csproj
+++ b/samples/Extensions/Extensions.csproj
@@ -1,18 +1,20 @@
-<Project Sdk="Microsoft.NET.Sdk.Worker">
+<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
- <Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
- <NoWarn>CA2012</NoWarn>
+ <Nullable>enable</Nullable>
+ <NoWarn>IDE0060</NoWarn>
</PropertyGroup>
<ItemGroup>
+ <PackageReference Include="Microsoft.Extensions.DependencyInjection"
Version="9.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.1" />
+ <PackageReference Include="Microsoft.Extensions.Logging.Abstractions"
Version="9.0.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
</ItemGroup>
-
+
</Project>
diff --git a/samples/Extensions/HostApplicationBuilderExtensions.cs
b/samples/Extensions/HostApplicationBuilderExtensions.cs
new file mode 100644
index 0000000..303f1c1
--- /dev/null
+++ b/samples/Extensions/HostApplicationBuilderExtensions.cs
@@ -0,0 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Extensions;
+
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+
+/// <summary>
+/// Extensions for HostApplicationBuilder
+/// </summary>
+public static class HostApplicationBuilderExtensions
+{
+ /// <summary>
+ /// Add a hosted service to 'Services' and return the builder
+ /// </summary>
+ public static HostApplicationBuilder AddHostedService<THostedService>(this
HostApplicationBuilder builder) where THostedService : class, IHostedService
+ {
+ builder.Services.AddHostedService<THostedService>();
+ return builder;
+ }
+}
diff --git a/samples/Processing/LoggerExtensions.cs
b/samples/Extensions/LoggerExtensions.cs
similarity index 60%
rename from samples/Processing/LoggerExtensions.cs
rename to samples/Extensions/LoggerExtensions.cs
index 35154ef..b093d28 100644
--- a/samples/Processing/LoggerExtensions.cs
+++ b/samples/Extensions/LoggerExtensions.cs
@@ -12,15 +12,33 @@
* limitations under the License.
*/
-namespace Processing;
+namespace Extensions;
using DotPulsar;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
+using Microsoft.Extensions.Logging;
+/// <summary>
+/// Extensions for ILogger
+/// </summary>
public static partial class LoggerExtensions
{
- // Output Message
+ /// <summary>
+ /// Default logger for the PulsarClient exception handler
+ /// </summary>
+ public static void PulsarClientException(this ILogger logger,
ExceptionContext exceptionContext)
+ {
+ if (exceptionContext.Exception is not ChannelNotReadyException &&
exceptionContext.Exception is not TaskCanceledException)
+ logger.PulsarClientException(exceptionContext.Exception);
+ }
+
+ [LoggerMessage(EventId = 0, Level = LogLevel.Warning, Message = "The
PulsarClient got an exception")]
+ static partial void PulsarClientException(this ILogger logger, Exception
exception);
+
+ /// <summary>
+ /// Default logger for an IMessage with a string value
+ /// </summary>
public static void OutputMessage(this ILogger logger, IMessage<string>
message)
{
var publishedOn = message.PublishTimeAsDateTime;
@@ -28,20 +46,12 @@ public static partial class LoggerExtensions
logger.OutputMessage(publishedOn, payload);
}
- [LoggerMessage(EventId = 0, Level = LogLevel.Information, Message =
"Received: '{payload}' published on {publishedOn}")]
+ [LoggerMessage(EventId = 1, Level = LogLevel.Information, Message =
"Received: '{payload}' published on {publishedOn}")]
static partial void OutputMessage(this ILogger logger, DateTime
publishedOn, string payload);
- // The Pulsar Client got an exception
- public static void PulsarClientException(this ILogger logger,
ExceptionContext exceptionContext)
- {
- if (exceptionContext.Exception is not ChannelNotReadyException &&
exceptionContext.Exception is not TaskCanceledException)
- logger.PulsarClientException(exceptionContext.Exception);
- }
-
- [LoggerMessage(EventId = 1, Level = LogLevel.Warning, Message = "The
PulsarClient got an exception")]
- static partial void PulsarClientException(this ILogger logger, Exception
exception);
-
- // Consumer changed state
+ /// <summary>
+ /// Default logger for consumer state monitoring
+ /// </summary>
public static void ConsumerChangedState(this ILogger logger,
ConsumerStateChanged stateChanged)
{
var logLevel = stateChanged.ConsumerState switch
@@ -57,6 +67,42 @@ public static partial class LoggerExtensions
[LoggerMessage(EventId = 2, Message = "The consumer for topic '{topic}'
changed state to '{state}'")]
static partial void ConsumerChangedState(this ILogger logger, LogLevel
logLevel, string topic, string state);
+ /// <summary>
+ /// Default logger for producer state monitoring
+ /// </summary>
+ public static void ProducerChangedState(this ILogger logger,
ProducerStateChanged stateChanged)
+ {
+ var logLevel = stateChanged.ProducerState switch
+ {
+ ProducerState.Disconnected => LogLevel.Warning,
+ ProducerState.Faulted => LogLevel.Error,
+ _ => LogLevel.Information
+ };
+
+ logger.ProducerChangedState(logLevel, stateChanged.Producer.Topic,
stateChanged.ProducerState.ToString());
+ }
+
+ [LoggerMessage(EventId = 3, Message = "The producer for topic '{topic}'
changed state to '{state}'")]
+ static partial void ProducerChangedState(this ILogger logger, LogLevel
logLevel, string topic, string state);
+
+ /// <summary>
+ /// Default logger for reader state monitoring
+ /// </summary>
+ public static void ReaderChangedState(this ILogger logger,
ReaderStateChanged stateChanged)
+ {
+ var logLevel = stateChanged.ReaderState switch
+ {
+ ReaderState.Disconnected => LogLevel.Warning,
+ ReaderState.Faulted => LogLevel.Error,
+ _ => LogLevel.Information
+ };
+
+ logger.ReaderChangedState(logLevel, stateChanged.Reader.Topic,
stateChanged.ReaderState.ToString());
+ }
+
+ [LoggerMessage(EventId = 4, Message = "The reader for topic '{topic}'
changed state to '{state}'")]
+ static partial void ReaderChangedState(this ILogger logger, LogLevel
logLevel, string topic, string state);
+
// Consumer lost connection
public static ValueTask<string> ConsumerLostConnection(this ILogger
logger, IConsumer consumer, ConsumerState state, CancellationToken
cancellationToken)
{
@@ -64,7 +110,7 @@ public static partial class LoggerExtensions
return ValueTask.FromResult("TicketId"); // Simulating a ticket-id
after sending an alert. This is optional.
}
- [LoggerMessage(EventId = 3, Level = LogLevel.Warning, Message = "The
consumer for topic '{topic}' has lost the connection")]
+ [LoggerMessage(EventId = 5, Level = LogLevel.Warning, Message = "The
consumer for topic '{topic}' has lost the connection")]
static partial void ConsumerLostConnection(this ILogger logger, string
topic);
// Consumer regained connection
@@ -74,6 +120,6 @@ public static partial class LoggerExtensions
return ValueTask.CompletedTask; // If an alert has been opened, this
is where we can close it again
}
- [LoggerMessage(EventId = 4, Level = LogLevel.Warning, Message = "The
consumer for topic '{topic}' has regained the connection")]
+ [LoggerMessage(EventId = 6, Level = LogLevel.Warning, Message = "The
consumer for topic '{topic}' has regained the connection")]
static partial void ConsumerRegainedConnection(this ILogger logger, string
topic);
}
diff --git a/samples/Processing/Processing.csproj
b/samples/Processing/Processing.csproj
index 1365045..d611fd6 100644
--- a/samples/Processing/Processing.csproj
+++ b/samples/Processing/Processing.csproj
@@ -13,6 +13,7 @@
<ItemGroup>
<ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
+ <ProjectReference Include="..\Extensions\Extensions.csproj" />
</ItemGroup>
</Project>
diff --git a/samples/Processing/Program.cs b/samples/Processing/Program.cs
index ddb38c3..273cb18 100644
--- a/samples/Processing/Program.cs
+++ b/samples/Processing/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -12,10 +12,11 @@
* limitations under the License.
*/
+using Extensions;
using Processing;
await Host
- .CreateDefaultBuilder(args)
- .ConfigureServices(services => services.AddHostedService<Worker>())
+ .CreateApplicationBuilder(args)
+ .AddHostedService<Worker>()
.Build()
.RunAsync();
diff --git a/samples/Processing/Worker.cs b/samples/Processing/Worker.cs
index d49bc64..7048c39 100644
--- a/samples/Processing/Worker.cs
+++ b/samples/Processing/Worker.cs
@@ -17,8 +17,9 @@ namespace Processing;
using DotPulsar;
using DotPulsar.Abstractions;
using DotPulsar.Extensions;
+using Extensions;
-public class Worker : BackgroundService
+public sealed class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
diff --git a/samples/Producing/Producing.csproj
b/samples/Producing/Producing.csproj
index 1fd8f8f..15ce275 100644
--- a/samples/Producing/Producing.csproj
+++ b/samples/Producing/Producing.csproj
@@ -1,13 +1,19 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk.Worker">
<PropertyGroup>
- <OutputType>Exe</OutputType>
<TargetFramework>net9.0</TargetFramework>
+ <Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
+ <NoWarn>CA2254</NoWarn>
</PropertyGroup>
+ <ItemGroup>
+ <PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.1" />
+ </ItemGroup>
+
<ItemGroup>
<ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
+ <ProjectReference Include="..\Extensions\Extensions.csproj" />
</ItemGroup>
</Project>
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 425bea3..276ed24 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -12,52 +12,11 @@
* limitations under the License.
*/
-using DotPulsar;
-using DotPulsar.Abstractions;
-using DotPulsar.Extensions;
-
-var cts = new CancellationTokenSource();
-
-Console.CancelKeyPress += (sender, args) =>
-{
- cts.Cancel();
- args.Cancel = true;
-};
-
-await using var client = PulsarClient
- .Builder()
- .ExceptionHandler(ExceptionHandler)
- .Build(); // Connecting to pulsar://localhost:6650
-
-await using var producer = client.NewProducer(Schema.String)
- .StateChangedHandler(StateChangedHandler)
- .Topic("persistent://public/default/mytopic")
- .Create();
-
-Console.WriteLine("Press Ctrl+C to exit");
-
-await ProduceMessages(producer, cts.Token);
-
-async Task ProduceMessages(IProducer<string> producer, CancellationToken
cancellationToken)
-{
- var delay = TimeSpan.FromSeconds(5);
-
- try
- {
- while (!cancellationToken.IsCancellationRequested)
- {
- var data = DateTime.UtcNow.ToLongTimeString();
- _ = await producer.Send(data, cancellationToken);
- Console.WriteLine($"Sent: {data}");
- await Task.Delay(delay, cancellationToken);
- }
- }
- catch (OperationCanceledException) // If not using the cancellationToken,
then just dispose the producer and catch ObjectDisposedException instead
- { }
-}
-
-void ExceptionHandler(ExceptionContext context) =>
- Console.WriteLine($"The PulsarClient got an exception:
{context.Exception}");
-
-void StateChangedHandler(ProducerStateChanged stateChanged) =>
- Console.WriteLine($"The producer for topic '{stateChanged.Producer.Topic}'
changed state to '{stateChanged.ProducerState}'");
+using Extensions;
+using Producing;
+
+await Host
+ .CreateApplicationBuilder(args)
+ .AddHostedService<SendWorker>()
+ .Build()
+ .RunAsync();
diff --git a/samples/Producing/Properties/launchSettings.json
b/samples/Producing/Properties/launchSettings.json
new file mode 100644
index 0000000..2906cde
--- /dev/null
+++ b/samples/Producing/Properties/launchSettings.json
@@ -0,0 +1,11 @@
+{
+ "profiles": {
+ "Processing": {
+ "commandName": "Project",
+ "dotnetRunMessages": true,
+ "environmentVariables": {
+ "DOTNET_ENVIRONMENT": "Development"
+ }
+ }
+ }
+}
diff --git a/samples/Producing/SendWorker.cs b/samples/Producing/SendWorker.cs
new file mode 100644
index 0000000..65d3b4a
--- /dev/null
+++ b/samples/Producing/SendWorker.cs
@@ -0,0 +1,50 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Producing;
+
+using DotPulsar;
+using DotPulsar.Extensions;
+using Extensions;
+
+public sealed class SendWorker : BackgroundService
+{
+ private readonly ILogger _logger;
+
+ public SendWorker(ILogger<SendWorker> logger) => _logger = logger;
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ await using var client = PulsarClient.Builder()
+ .ExceptionHandler(_logger.PulsarClientException) // Optional
+ .Build(); // Connecting to
pulsar://localhost:6650
+
+ await using var producer = client.NewProducer(Schema.String)
+ .StateChangedHandler(_logger.ProducerChangedState) // Optional
+ .Topic("persistent://public/default/mytopic")
+ .Create();
+
+ var delay = TimeSpan.FromSeconds(5);
+
+ _logger.LogInformation($"Will start sending messages every
{delay.TotalSeconds} seconds");
+
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ var data = DateTime.UtcNow.ToLongTimeString();
+ var messageId = await producer.Send(data, stoppingToken);
+ _logger.LogInformation($"Sent message with content: '{data}' and
got message id: '{messageId}'");
+ await Task.Delay(delay, stoppingToken);
+ }
+ }
+}
diff --git a/samples/Producing/appsettings.Development.json
b/samples/Producing/appsettings.Development.json
new file mode 100644
index 0000000..b2dcdb6
--- /dev/null
+++ b/samples/Producing/appsettings.Development.json
@@ -0,0 +1,8 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft.Hosting.Lifetime": "Information"
+ }
+ }
+}
diff --git a/samples/Producing/appsettings.json
b/samples/Producing/appsettings.json
new file mode 100644
index 0000000..b2dcdb6
--- /dev/null
+++ b/samples/Producing/appsettings.json
@@ -0,0 +1,8 @@
+{
+ "Logging": {
+ "LogLevel": {
+ "Default": "Information",
+ "Microsoft.Hosting.Lifetime": "Information"
+ }
+ }
+}