This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a6d825  Add sample using send channel (#215)
7a6d825 is described below

commit 7a6d825da5c31f662a1db35846143c0606bd824f
Author: Kristian Andersen <kanderse...@users.noreply.github.com>
AuthorDate: Mon Apr 22 08:45:09 2024 +0200

    Add sample using send channel (#215)
---
 DotPulsar.sln                          |  7 +++
 samples/SendChannel/Program.cs         | 82 ++++++++++++++++++++++++++++++++++
 samples/SendChannel/SendChannel.csproj | 12 +++++
 3 files changed, 101 insertions(+)

diff --git a/DotPulsar.sln b/DotPulsar.sln
index 1706858..ed8376d 100644
--- a/DotPulsar.sln
+++ b/DotPulsar.sln
@@ -30,6 +30,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = 
"Compression", "benchmarks\C
 EndProject
 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmarks", 
"Benchmarks", "{2C57AF4B-0D23-42D7-86FE-80277FD52875}"
 EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SendChannel", 
"samples\SendChannel\SendChannel.csproj", 
"{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}"
+EndProject
 Global
        GlobalSection(SolutionConfigurationPlatforms) = preSolution
                Debug|Any CPU = Debug|Any CPU
@@ -64,6 +66,10 @@ Global
                {040F8253-074D-4977-BDB1-0D9798B52CE2}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
                {040F8253-074D-4977-BDB1-0D9798B52CE2}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
                {040F8253-074D-4977-BDB1-0D9798B52CE2}.Release|Any CPU.Build.0 
= Release|Any CPU
+               {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.ActiveCfg 
= Debug|Any CPU
+               {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
+               {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
+               {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any CPU.Build.0 
= Release|Any CPU
        EndGlobalSection
        GlobalSection(SolutionProperties) = preSolution
                HideSolutionNode = FALSE
@@ -75,6 +81,7 @@ Global
                {6D44683B-865C-4D15-9F0A-1A8441354589} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
                {CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
                {040F8253-074D-4977-BDB1-0D9798B52CE2} = 
{2C57AF4B-0D23-42D7-86FE-80277FD52875}
+               {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
        EndGlobalSection
        GlobalSection(ExtensibilityGlobals) = postSolution
                SolutionGuid = {88355922-E70A-4B73-B7F8-ABF8F2B59789}
diff --git a/samples/SendChannel/Program.cs b/samples/SendChannel/Program.cs
new file mode 100644
index 0000000..d79f038
--- /dev/null
+++ b/samples/SendChannel/Program.cs
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace SendChannel;
+
+using DotPulsar;
+using DotPulsar.Abstractions;
+using DotPulsar.Extensions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+internal static class Program
+{
+    private static async Task Main()
+    {
+        var cts = new CancellationTokenSource();
+
+        Console.CancelKeyPress += (sender, args) =>
+        {
+            cts.Cancel();
+            args.Cancel = true;
+        };
+
+        await using var client = PulsarClient.Builder().Build(); // Connecting 
to pulsar://localhost:6650
+
+        await using var producer = client.NewProducer(Schema.String)
+            .StateChangedHandler(Monitor)
+            .Topic("persistent://public/default/mytopic")
+            .Create();
+
+        Console.WriteLine("Press Ctrl+C to exit");
+
+        var sendChannel = producer.SendChannel;
+        await ProduceMessages(sendChannel, 1000, cts.Token);
+        sendChannel.Complete();
+
+        var shutdownCts = new CancellationTokenSource();
+        shutdownCts.CancelAfter(TimeSpan.FromSeconds(30));
+        await sendChannel.Completion(shutdownCts.Token);
+    }
+
+    private static async Task ProduceMessages(ISendChannel<string> 
sendChannel, int messages, CancellationToken cancellationToken)
+    {
+        try
+        {
+            int i = 0;
+            while (++i <= messages && 
!cancellationToken.IsCancellationRequested)
+            {
+                var data = DateTime.UtcNow.ToLongTimeString();
+
+                await sendChannel.Send(data, id =>
+                {
+                    Console.WriteLine($"Received acknowledgement for {id}");
+                    return ValueTask.CompletedTask;
+                }, cancellationToken);
+
+                Console.WriteLine($"Sent: {data}");
+            }
+        }
+        catch (OperationCanceledException) // If not using the 
cancellationToken, then just dispose the producer and catch 
ObjectDisposedException instead
+        { }
+    }
+
+    private static void Monitor(ProducerStateChanged stateChanged)
+    {
+        var topic = stateChanged.Producer.Topic;
+        var state = stateChanged.ProducerState;
+        Console.WriteLine($"The producer for topic '{topic}' changed state to 
'{state}'");
+    }
+}
diff --git a/samples/SendChannel/SendChannel.csproj 
b/samples/SendChannel/SendChannel.csproj
new file mode 100644
index 0000000..9384afa
--- /dev/null
+++ b/samples/SendChannel/SendChannel.csproj
@@ -0,0 +1,12 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <OutputType>Exe</OutputType>
+    <TargetFramework>net8.0</TargetFramework>
+  </PropertyGroup>
+
+    <ItemGroup>
+      <ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" />
+    </ItemGroup>
+
+</Project>

Reply via email to