This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 615090e  Changed the Producing sample from a console app to a worker 
service
615090e is described below

commit 615090e9f74a70d9535a1d3a6f151d364c5b3f54
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Feb 5 13:25:37 2025 +0100

    Changed the Producing sample from a console app to a worker service
---
 DotPulsar.sln                                      |  7 ++
 .../Extensions.csproj}                             | 10 +--
 .../Extensions/HostApplicationBuilderExtensions.cs | 33 +++++++++
 .../{Processing => Extensions}/LoggerExtensions.cs | 78 +++++++++++++++++-----
 samples/Processing/Processing.csproj               |  1 +
 samples/Processing/Program.cs                      |  7 +-
 samples/Processing/Worker.cs                       |  3 +-
 samples/Producing/Producing.csproj                 | 10 ++-
 samples/Producing/Program.cs                       | 57 +++-------------
 samples/Producing/Properties/launchSettings.json   | 11 +++
 samples/Producing/SendWorker.cs                    | 50 ++++++++++++++
 samples/Producing/appsettings.Development.json     |  8 +++
 samples/Producing/appsettings.json                 |  8 +++
 13 files changed, 208 insertions(+), 75 deletions(-)

diff --git a/DotPulsar.sln b/DotPulsar.sln
index ed8376d..032bfca 100644
--- a/DotPulsar.sln
+++ b/DotPulsar.sln
@@ -32,6 +32,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = 
"Benchmarks", "Benchmarks",
 EndProject
 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SendChannel", 
"samples\SendChannel\SendChannel.csproj", 
"{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}"
 EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Extensions", 
"samples\Extensions\Extensions.csproj", "{F211960B-F2C6-4878-B1AD-72D63CDB3B7A}"
+EndProject
 Global
        GlobalSection(SolutionConfigurationPlatforms) = preSolution
                Debug|Any CPU = Debug|Any CPU
@@ -70,6 +72,10 @@ Global
                {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
                {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
                {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any CPU.Build.0 
= Release|Any CPU
+               {F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Debug|Any CPU.ActiveCfg 
= Debug|Any CPU
+               {F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
+               {F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
+               {F211960B-F2C6-4878-B1AD-72D63CDB3B7A}.Release|Any CPU.Build.0 
= Release|Any CPU
        EndGlobalSection
        GlobalSection(SolutionProperties) = preSolution
                HideSolutionNode = FALSE
@@ -82,6 +88,7 @@ Global
                {CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
                {040F8253-074D-4977-BDB1-0D9798B52CE2} = 
{2C57AF4B-0D23-42D7-86FE-80277FD52875}
                {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
+               {F211960B-F2C6-4878-B1AD-72D63CDB3B7A} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
        EndGlobalSection
        GlobalSection(ExtensibilityGlobals) = postSolution
                SolutionGuid = {88355922-E70A-4B73-B7F8-ABF8F2B59789}
diff --git a/samples/Processing/Processing.csproj 
b/samples/Extensions/Extensions.csproj
similarity index 60%
copy from samples/Processing/Processing.csproj
copy to samples/Extensions/Extensions.csproj
index 1365045..54ab226 100644
--- a/samples/Processing/Processing.csproj
+++ b/samples/Extensions/Extensions.csproj
@@ -1,18 +1,20 @@
-<Project Sdk="Microsoft.NET.Sdk.Worker">
+<Project Sdk="Microsoft.NET.Sdk">
 
   <PropertyGroup>
     <TargetFramework>net9.0</TargetFramework>
-    <Nullable>enable</Nullable>
     <ImplicitUsings>enable</ImplicitUsings>
-    <NoWarn>CA2012</NoWarn>
+    <Nullable>enable</Nullable>
+    <NoWarn>IDE0060</NoWarn>
   </PropertyGroup>
 
   <ItemGroup>
+    <PackageReference Include="Microsoft.Extensions.DependencyInjection" 
Version="9.0.1" />
     <PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.1" />
+    <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" 
Version="9.0.1" />
   </ItemGroup>
 
   <ItemGroup>
     <ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
   </ItemGroup>
-  
+
 </Project>
diff --git a/samples/Extensions/HostApplicationBuilderExtensions.cs 
b/samples/Extensions/HostApplicationBuilderExtensions.cs
new file mode 100644
index 0000000..303f1c1
--- /dev/null
+++ b/samples/Extensions/HostApplicationBuilderExtensions.cs
@@ -0,0 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Extensions;
+
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+
+/// <summary>
+/// Extensions for HostApplicationBuilder
+/// </summary>
+public static class HostApplicationBuilderExtensions
+{
+    /// <summary>
+    /// Add a hosted service to 'Services' and return the builder
+    /// </summary>
+    public static HostApplicationBuilder AddHostedService<THostedService>(this 
HostApplicationBuilder builder) where THostedService : class, IHostedService
+    {
+        builder.Services.AddHostedService<THostedService>();
+        return builder;
+    }
+}
diff --git a/samples/Processing/LoggerExtensions.cs 
b/samples/Extensions/LoggerExtensions.cs
similarity index 60%
rename from samples/Processing/LoggerExtensions.cs
rename to samples/Extensions/LoggerExtensions.cs
index 35154ef..b093d28 100644
--- a/samples/Processing/LoggerExtensions.cs
+++ b/samples/Extensions/LoggerExtensions.cs
@@ -12,15 +12,33 @@
  * limitations under the License.
  */
 
-namespace Processing;
+namespace Extensions;
 
 using DotPulsar;
 using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
+using Microsoft.Extensions.Logging;
 
+/// <summary>
+/// Extensions for ILogger
+/// </summary>
 public static partial class LoggerExtensions
 {
-    // Output Message
+    /// <summary>
+    /// Default logger for the PulsarClient exception handler
+    /// </summary>
+    public static void PulsarClientException(this ILogger logger, 
ExceptionContext exceptionContext)
+    {
+        if (exceptionContext.Exception is not ChannelNotReadyException && 
exceptionContext.Exception is not TaskCanceledException)
+            logger.PulsarClientException(exceptionContext.Exception);
+    }
+
+    [LoggerMessage(EventId = 0, Level = LogLevel.Warning, Message = "The 
PulsarClient got an exception")]
+    static partial void PulsarClientException(this ILogger logger, Exception 
exception);
+
+    /// <summary>
+    /// Default logger for an IMessage with a string value
+    /// </summary>
     public static void OutputMessage(this ILogger logger, IMessage<string> 
message)
     {
         var publishedOn = message.PublishTimeAsDateTime;
@@ -28,20 +46,12 @@ public static partial class LoggerExtensions
         logger.OutputMessage(publishedOn, payload);
     }
 
-    [LoggerMessage(EventId = 0, Level = LogLevel.Information, Message = 
"Received: '{payload}' published on {publishedOn}")]
+    [LoggerMessage(EventId = 1, Level = LogLevel.Information, Message = 
"Received: '{payload}' published on {publishedOn}")]
     static partial void OutputMessage(this ILogger logger, DateTime 
publishedOn, string payload);
 
-    // The Pulsar Client got an exception
-    public static void PulsarClientException(this ILogger logger, 
ExceptionContext exceptionContext)
-    {
-        if (exceptionContext.Exception is not ChannelNotReadyException && 
exceptionContext.Exception is not TaskCanceledException)
-            logger.PulsarClientException(exceptionContext.Exception);
-    }
-
-    [LoggerMessage(EventId = 1, Level = LogLevel.Warning, Message = "The 
PulsarClient got an exception")]
-    static partial void PulsarClientException(this ILogger logger, Exception 
exception);
-
-    // Consumer changed state
+    /// <summary>
+    /// Default logger for consumer state monitoring
+    /// </summary>
     public static void ConsumerChangedState(this ILogger logger, 
ConsumerStateChanged stateChanged)
     {
         var logLevel = stateChanged.ConsumerState switch
@@ -57,6 +67,42 @@ public static partial class LoggerExtensions
     [LoggerMessage(EventId = 2, Message = "The consumer for topic '{topic}' 
changed state to '{state}'")]
     static partial void ConsumerChangedState(this ILogger logger, LogLevel 
logLevel, string topic, string state);
 
+    /// <summary>
+    /// Default logger for producer state monitoring
+    /// </summary>
+    public static void ProducerChangedState(this ILogger logger, 
ProducerStateChanged stateChanged)
+    {
+        var logLevel = stateChanged.ProducerState switch
+        {
+            ProducerState.Disconnected => LogLevel.Warning,
+            ProducerState.Faulted => LogLevel.Error,
+            _ => LogLevel.Information
+        };
+
+        logger.ProducerChangedState(logLevel, stateChanged.Producer.Topic, 
stateChanged.ProducerState.ToString());
+    }
+
+    [LoggerMessage(EventId = 3, Message = "The producer for topic '{topic}' 
changed state to '{state}'")]
+    static partial void ProducerChangedState(this ILogger logger, LogLevel 
logLevel, string topic, string state);
+
+    /// <summary>
+    /// Default logger for reader state monitoring
+    /// </summary>
+    public static void ReaderChangedState(this ILogger logger, 
ReaderStateChanged stateChanged)
+    {
+        var logLevel = stateChanged.ReaderState switch
+        {
+            ReaderState.Disconnected => LogLevel.Warning,
+            ReaderState.Faulted => LogLevel.Error,
+            _ => LogLevel.Information
+        };
+
+        logger.ReaderChangedState(logLevel, stateChanged.Reader.Topic, 
stateChanged.ReaderState.ToString());
+    }
+
+    [LoggerMessage(EventId = 4, Message = "The reader for topic '{topic}' 
changed state to '{state}'")]
+    static partial void ReaderChangedState(this ILogger logger, LogLevel 
logLevel, string topic, string state);
+
     // Consumer lost connection
     public static ValueTask<string> ConsumerLostConnection(this ILogger 
logger, IConsumer consumer, ConsumerState state, CancellationToken 
cancellationToken)
     {
@@ -64,7 +110,7 @@ public static partial class LoggerExtensions
         return ValueTask.FromResult("TicketId"); // Simulating a ticket-id 
after sending an alert. This is optional.
     }
 
-    [LoggerMessage(EventId = 3, Level = LogLevel.Warning, Message = "The 
consumer for topic '{topic}' has lost the connection")]
+    [LoggerMessage(EventId = 5, Level = LogLevel.Warning, Message = "The 
consumer for topic '{topic}' has lost the connection")]
     static partial void ConsumerLostConnection(this ILogger logger, string 
topic);
 
     // Consumer regained connection
@@ -74,6 +120,6 @@ public static partial class LoggerExtensions
         return ValueTask.CompletedTask; // If an alert has been opened, this 
is where we can close it again
     }
 
-    [LoggerMessage(EventId = 4, Level = LogLevel.Warning, Message = "The 
consumer for topic '{topic}' has regained the connection")]
+    [LoggerMessage(EventId = 6, Level = LogLevel.Warning, Message = "The 
consumer for topic '{topic}' has regained the connection")]
     static partial void ConsumerRegainedConnection(this ILogger logger, string 
topic);
 }
diff --git a/samples/Processing/Processing.csproj 
b/samples/Processing/Processing.csproj
index 1365045..d611fd6 100644
--- a/samples/Processing/Processing.csproj
+++ b/samples/Processing/Processing.csproj
@@ -13,6 +13,7 @@
 
   <ItemGroup>
     <ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
+    <ProjectReference Include="..\Extensions\Extensions.csproj" />
   </ItemGroup>
   
 </Project>
diff --git a/samples/Processing/Program.cs b/samples/Processing/Program.cs
index ddb38c3..273cb18 100644
--- a/samples/Processing/Program.cs
+++ b/samples/Processing/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -12,10 +12,11 @@
  * limitations under the License.
  */
 
+using Extensions;
 using Processing;
 
 await Host
-    .CreateDefaultBuilder(args)
-    .ConfigureServices(services => services.AddHostedService<Worker>())
+    .CreateApplicationBuilder(args)
+    .AddHostedService<Worker>()
     .Build()
     .RunAsync();
diff --git a/samples/Processing/Worker.cs b/samples/Processing/Worker.cs
index d49bc64..7048c39 100644
--- a/samples/Processing/Worker.cs
+++ b/samples/Processing/Worker.cs
@@ -17,8 +17,9 @@ namespace Processing;
 using DotPulsar;
 using DotPulsar.Abstractions;
 using DotPulsar.Extensions;
+using Extensions;
 
-public class Worker : BackgroundService
+public sealed class Worker : BackgroundService
 {
     private readonly ILogger<Worker> _logger;
 
diff --git a/samples/Producing/Producing.csproj 
b/samples/Producing/Producing.csproj
index 1fd8f8f..15ce275 100644
--- a/samples/Producing/Producing.csproj
+++ b/samples/Producing/Producing.csproj
@@ -1,13 +1,19 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk.Worker">
 
   <PropertyGroup>
-    <OutputType>Exe</OutputType>
     <TargetFramework>net9.0</TargetFramework>
+    <Nullable>enable</Nullable>
     <ImplicitUsings>enable</ImplicitUsings>
+    <NoWarn>CA2254</NoWarn>
   </PropertyGroup>
 
+  <ItemGroup>
+    <PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.1" />
+  </ItemGroup>
+
   <ItemGroup>
     <ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
+    <ProjectReference Include="..\Extensions\Extensions.csproj" />
   </ItemGroup>
 
 </Project>
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 425bea3..276ed24 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -12,52 +12,11 @@
  * limitations under the License.
  */
 
-using DotPulsar;
-using DotPulsar.Abstractions;
-using DotPulsar.Extensions;
-
-var cts = new CancellationTokenSource();
-
-Console.CancelKeyPress += (sender, args) =>
-{
-    cts.Cancel();
-    args.Cancel = true;
-};
-
-await using var client = PulsarClient
-    .Builder()
-    .ExceptionHandler(ExceptionHandler)
-    .Build(); // Connecting to pulsar://localhost:6650
-
-await using var producer = client.NewProducer(Schema.String)
-    .StateChangedHandler(StateChangedHandler)
-    .Topic("persistent://public/default/mytopic")
-    .Create();
-
-Console.WriteLine("Press Ctrl+C to exit");
-
-await ProduceMessages(producer, cts.Token);
-
-async Task ProduceMessages(IProducer<string> producer, CancellationToken 
cancellationToken)
-{
-    var delay = TimeSpan.FromSeconds(5);
-
-    try
-    {
-        while (!cancellationToken.IsCancellationRequested)
-        {
-            var data = DateTime.UtcNow.ToLongTimeString();
-            _ = await producer.Send(data, cancellationToken);
-            Console.WriteLine($"Sent: {data}");
-            await Task.Delay(delay, cancellationToken);
-        }
-    }
-    catch (OperationCanceledException) // If not using the cancellationToken, 
then just dispose the producer and catch ObjectDisposedException instead
-    { }
-}
-
-void ExceptionHandler(ExceptionContext context) =>
-    Console.WriteLine($"The PulsarClient got an exception: 
{context.Exception}");
-
-void StateChangedHandler(ProducerStateChanged stateChanged) =>
-    Console.WriteLine($"The producer for topic '{stateChanged.Producer.Topic}' 
changed state to '{stateChanged.ProducerState}'");
+using Extensions;
+using Producing;
+
+await Host
+    .CreateApplicationBuilder(args)
+    .AddHostedService<SendWorker>()
+    .Build()
+    .RunAsync();
diff --git a/samples/Producing/Properties/launchSettings.json 
b/samples/Producing/Properties/launchSettings.json
new file mode 100644
index 0000000..2906cde
--- /dev/null
+++ b/samples/Producing/Properties/launchSettings.json
@@ -0,0 +1,11 @@
+{
+  "profiles": {
+    "Processing": {
+      "commandName": "Project",
+      "dotnetRunMessages": true,
+      "environmentVariables": {
+        "DOTNET_ENVIRONMENT": "Development"
+      }
+    }
+  }
+}
diff --git a/samples/Producing/SendWorker.cs b/samples/Producing/SendWorker.cs
new file mode 100644
index 0000000..65d3b4a
--- /dev/null
+++ b/samples/Producing/SendWorker.cs
@@ -0,0 +1,50 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Producing;
+
+using DotPulsar;
+using DotPulsar.Extensions;
+using Extensions;
+
+public sealed class SendWorker : BackgroundService
+{
+    private readonly ILogger _logger;
+
+    public SendWorker(ILogger<SendWorker> logger) => _logger = logger;
+
+    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+    {
+        await using var client = PulsarClient.Builder()
+            .ExceptionHandler(_logger.PulsarClientException) // Optional
+            .Build();                                        // Connecting to 
pulsar://localhost:6650
+
+        await using var producer = client.NewProducer(Schema.String)
+            .StateChangedHandler(_logger.ProducerChangedState) // Optional
+            .Topic("persistent://public/default/mytopic")
+            .Create();
+
+        var delay = TimeSpan.FromSeconds(5);
+
+        _logger.LogInformation($"Will start sending messages every 
{delay.TotalSeconds} seconds");
+
+        while (!stoppingToken.IsCancellationRequested)
+        {
+            var data = DateTime.UtcNow.ToLongTimeString();
+            var messageId = await producer.Send(data, stoppingToken);
+            _logger.LogInformation($"Sent message with content: '{data}' and 
got message id: '{messageId}'");
+            await Task.Delay(delay, stoppingToken);
+        }
+    }
+}
diff --git a/samples/Producing/appsettings.Development.json 
b/samples/Producing/appsettings.Development.json
new file mode 100644
index 0000000..b2dcdb6
--- /dev/null
+++ b/samples/Producing/appsettings.Development.json
@@ -0,0 +1,8 @@
+{
+  "Logging": {
+    "LogLevel": {
+      "Default": "Information",
+      "Microsoft.Hosting.Lifetime": "Information"
+    }
+  }
+}
diff --git a/samples/Producing/appsettings.json 
b/samples/Producing/appsettings.json
new file mode 100644
index 0000000..b2dcdb6
--- /dev/null
+++ b/samples/Producing/appsettings.json
@@ -0,0 +1,8 @@
+{
+  "Logging": {
+    "LogLevel": {
+      "Default": "Information",
+      "Microsoft.Hosting.Lifetime": "Information"
+    }
+  }
+}

Reply via email to