This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push: new b7c1910 Updating NuGet package and adding topic property to IConsumer, IProducer and IReader. b7c1910 is described below commit b7c1910d3047452dfebbdd3ea6afc36c26bb7ea0 Author: Daniel Blankensteiner <d...@vmail.dk> AuthorDate: Thu Apr 2 23:04:54 2020 +0200 Updating NuGet package and adding topic property to IConsumer, IProducer and IReader. --- src/DotPulsar/Abstractions/IConsumer.cs | 7 ++++++- src/DotPulsar/Abstractions/IProducer.cs | 7 ++++++- src/DotPulsar/Abstractions/IReader.cs | 7 ++++++- src/DotPulsar/Internal/Consumer.cs | 6 +++++- src/DotPulsar/Internal/Producer.cs | 6 +++++- src/DotPulsar/Internal/Reader.cs | 6 +++++- src/DotPulsar/PulsarClient.cs | 11 +++++------ tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj | 2 +- tests/DotPulsar.Tests/DotPulsar.Tests.csproj | 2 +- 9 files changed, 40 insertions(+), 14 deletions(-) diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs index 4d1c54c..4d33ab7 100644 --- a/src/DotPulsar/Abstractions/IConsumer.cs +++ b/src/DotPulsar/Abstractions/IConsumer.cs @@ -1,4 +1,4 @@ -/* +/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -60,6 +60,11 @@ namespace DotPulsar.Abstractions ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default); /// <summary> + /// The topic of the consumer. + /// </summary> + string Topic { get; } + + /// <summary> /// Unsubscribe the consumer. /// </summary> ValueTask Unsubscribe(CancellationToken cancellationToken = default); diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs index dec0362..11e46d1 100644 --- a/src/DotPulsar/Abstractions/IProducer.cs +++ b/src/DotPulsar/Abstractions/IProducer.cs @@ -1,4 +1,4 @@ -/* +/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -53,5 +53,10 @@ namespace DotPulsar.Abstractions /// Sends a message with metadata. /// </summary> ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken = default); + + /// <summary> + /// The topic of the producer. + /// </summary> + string Topic { get; } } } diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs index d3a04c1..091551c 100644 --- a/src/DotPulsar/Abstractions/IReader.cs +++ b/src/DotPulsar/Abstractions/IReader.cs @@ -1,4 +1,4 @@ -/* +/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -27,5 +27,10 @@ namespace DotPulsar.Abstractions /// Get an IAsyncEnumerable for reading messages /// </summary> IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default); + + /// <summary> + /// The topic of the reader. + /// </summary> + string Topic { get; } } } diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs index d7b4711..9fbdc37 100644 --- a/src/DotPulsar/Internal/Consumer.cs +++ b/src/DotPulsar/Internal/Consumer.cs @@ -1,4 +1,4 @@ -/* +/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -35,14 +35,18 @@ namespace DotPulsar.Internal private readonly IStateChanged<ConsumerState> _state; private int _isDisposed; + public string Topic { get; } + public Consumer( Guid correlationId, + string topic, IRegisterEvent eventRegister, IConsumerChannel initialChannel, IExecute executor, IStateChanged<ConsumerState> state) { _correlationId = correlationId; + Topic = topic; _eventRegister = eventRegister; _channel = initialChannel; _executor = executor; diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs index 9a67d64..d9e5dd0 100644 --- a/src/DotPulsar/Internal/Producer.cs +++ b/src/DotPulsar/Internal/Producer.cs @@ -1,4 +1,4 @@ -/* +/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -32,14 +32,18 @@ namespace DotPulsar.Internal private readonly IStateChanged<ProducerState> _state; private int _isDisposed; + public string Topic { get; } + public Producer( Guid correlationId, + string topic, IRegisterEvent registerEvent, IProducerChannel initialChannel, IExecute executor, IStateChanged<ProducerState> state) { _correlationId = correlationId; + Topic = topic; _eventRegister = registerEvent; _channel = initialChannel; _executor = executor; diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs index d559663..5b7d740 100644 --- a/src/DotPulsar/Internal/Reader.cs +++ b/src/DotPulsar/Internal/Reader.cs @@ -1,4 +1,4 @@ -/* +/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -33,14 +33,18 @@ namespace DotPulsar.Internal private readonly IStateChanged<ReaderState> _state; private int _isDisposed; + public string Topic { get; } + public Reader( Guid correlationId, + string topic, IRegisterEvent eventRegister, IReaderChannel initialChannel, IExecute executor, IStateChanged<ReaderState> state) { _correlationId = correlationId; + Topic = topic; _eventRegister = eventRegister; _channel = initialChannel; _executor = executor; diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs index 46dcfa9..cdde546 100644 --- a/src/DotPulsar/PulsarClient.cs +++ b/src/DotPulsar/PulsarClient.cs @@ -1,4 +1,4 @@ -/* +/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -48,7 +48,7 @@ namespace DotPulsar var executor = new Executor(correlationId, _processManager, _exceptionHandler); var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options); var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted); - var producer = new Producer(correlationId, _processManager, new NotReadyChannel(), new AsyncLockExecutor(executor), stateManager); + var producer = new Producer(correlationId, options.Topic, _processManager, new NotReadyChannel(), new AsyncLockExecutor(executor), stateManager); var process = new ProducerProcess(correlationId, stateManager, factory, producer); _processManager.Add(process); process.Start(); @@ -62,9 +62,8 @@ namespace DotPulsar var executor = new Executor(correlationId, _processManager, _exceptionHandler); var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options); - var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, - ConsumerState.Faulted); - var consumer = new Consumer(correlationId, _processManager, new NotReadyChannel(), new AsyncLockExecutor(executor), stateManager); + var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted); + var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), new AsyncLockExecutor(executor), stateManager); var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover); _processManager.Add(process); process.Start(); @@ -78,7 +77,7 @@ namespace DotPulsar var executor = new Executor(correlationId, _processManager, _exceptionHandler); var factory = new ReaderChannelFactory(correlationId, _processManager, _connectionPool, executor, options); var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted); - var reader = new Reader(correlationId, _processManager, new NotReadyChannel(), new AsyncLockExecutor(executor), stateManager); + var reader = new Reader(correlationId, options.Topic, _processManager, new NotReadyChannel(), new AsyncLockExecutor(executor), stateManager); var process = new ReaderProcess(correlationId, stateManager, factory, reader); _processManager.Add(process); process.Start(); diff --git a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj index d7dadce..c92fa9d 100644 --- a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj +++ b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj @@ -12,7 +12,7 @@ <PrivateAssets>all</PrivateAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> </PackageReference> - <PackageReference Include="coverlet.collector" Version="1.2.0"> + <PackageReference Include="coverlet.collector" Version="1.2.1"> <PrivateAssets>all</PrivateAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> </PackageReference> diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj index 2f62d2d..3eccafe 100644 --- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj +++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj @@ -12,7 +12,7 @@ <PrivateAssets>all</PrivateAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> </PackageReference> - <PackageReference Include="coverlet.collector" Version="1.2.0"> + <PackageReference Include="coverlet.collector" Version="1.2.1"> <PrivateAssets>all</PrivateAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> </PackageReference>