This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7a6d825 Add sample using send channel (#215) 7a6d825 is described below commit 7a6d825da5c31f662a1db35846143c0606bd824f Author: Kristian Andersen <kanderse...@users.noreply.github.com> AuthorDate: Mon Apr 22 08:45:09 2024 +0200 Add sample using send channel (#215) --- DotPulsar.sln | 7 +++ samples/SendChannel/Program.cs | 82 ++++++++++++++++++++++++++++++++++ samples/SendChannel/SendChannel.csproj | 12 +++++ 3 files changed, 101 insertions(+) diff --git a/DotPulsar.sln b/DotPulsar.sln index 1706858..ed8376d 100644 --- a/DotPulsar.sln +++ b/DotPulsar.sln @@ -30,6 +30,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Compression", "benchmarks\C EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmarks", "Benchmarks", "{2C57AF4B-0D23-42D7-86FE-80277FD52875}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SendChannel", "samples\SendChannel\SendChannel.csproj", "{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -64,6 +66,10 @@ Global {040F8253-074D-4977-BDB1-0D9798B52CE2}.Debug|Any CPU.Build.0 = Debug|Any CPU {040F8253-074D-4977-BDB1-0D9798B52CE2}.Release|Any CPU.ActiveCfg = Release|Any CPU {040F8253-074D-4977-BDB1-0D9798B52CE2}.Release|Any CPU.Build.0 = Release|Any CPU + {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -75,6 +81,7 @@ Global {6D44683B-865C-4D15-9F0A-1A8441354589} = {E7106D0F-B255-4631-9FB8-734FC5748FA9} {CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E} = {E7106D0F-B255-4631-9FB8-734FC5748FA9} {040F8253-074D-4977-BDB1-0D9798B52CE2} = {2C57AF4B-0D23-42D7-86FE-80277FD52875} + {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2} = {E7106D0F-B255-4631-9FB8-734FC5748FA9} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {88355922-E70A-4B73-B7F8-ABF8F2B59789} diff --git a/samples/SendChannel/Program.cs b/samples/SendChannel/Program.cs new file mode 100644 index 0000000..d79f038 --- /dev/null +++ b/samples/SendChannel/Program.cs @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace SendChannel; + +using DotPulsar; +using DotPulsar.Abstractions; +using DotPulsar.Extensions; +using System; +using System.Threading; +using System.Threading.Tasks; + +internal static class Program +{ + private static async Task Main() + { + var cts = new CancellationTokenSource(); + + Console.CancelKeyPress += (sender, args) => + { + cts.Cancel(); + args.Cancel = true; + }; + + await using var client = PulsarClient.Builder().Build(); // Connecting to pulsar://localhost:6650 + + await using var producer = client.NewProducer(Schema.String) + .StateChangedHandler(Monitor) + .Topic("persistent://public/default/mytopic") + .Create(); + + Console.WriteLine("Press Ctrl+C to exit"); + + var sendChannel = producer.SendChannel; + await ProduceMessages(sendChannel, 1000, cts.Token); + sendChannel.Complete(); + + var shutdownCts = new CancellationTokenSource(); + shutdownCts.CancelAfter(TimeSpan.FromSeconds(30)); + await sendChannel.Completion(shutdownCts.Token); + } + + private static async Task ProduceMessages(ISendChannel<string> sendChannel, int messages, CancellationToken cancellationToken) + { + try + { + int i = 0; + while (++i <= messages && !cancellationToken.IsCancellationRequested) + { + var data = DateTime.UtcNow.ToLongTimeString(); + + await sendChannel.Send(data, id => + { + Console.WriteLine($"Received acknowledgement for {id}"); + return ValueTask.CompletedTask; + }, cancellationToken); + + Console.WriteLine($"Sent: {data}"); + } + } + catch (OperationCanceledException) // If not using the cancellationToken, then just dispose the producer and catch ObjectDisposedException instead + { } + } + + private static void Monitor(ProducerStateChanged stateChanged) + { + var topic = stateChanged.Producer.Topic; + var state = stateChanged.ProducerState; + Console.WriteLine($"The producer for topic '{topic}' changed state to '{state}'"); + } +} diff --git a/samples/SendChannel/SendChannel.csproj b/samples/SendChannel/SendChannel.csproj new file mode 100644 index 0000000..9384afa --- /dev/null +++ b/samples/SendChannel/SendChannel.csproj @@ -0,0 +1,12 @@ +<Project Sdk="Microsoft.NET.Sdk"> + + <PropertyGroup> + <OutputType>Exe</OutputType> + <TargetFramework>net8.0</TargetFramework> + </PropertyGroup> + + <ItemGroup> + <ProjectReference Include="..\..\src\DotPulsar\DotPulsar.csproj" /> + </ItemGroup> + +</Project>