This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2633d59 Add trace linking. Needs testing.
2633d59 is described below
commit 2633d59475ed8d14a1e0d4131a050b89da6ffa90
Author: Daniel Blankensteiner <[email protected]>
AuthorDate: Wed Oct 26 22:18:23 2022 +0200
Add trace linking. Needs testing.
---
CHANGELOG.md | 10 +++++-
src/DotPulsar/Abstractions/IProducerBuilder.cs | 5 +++
src/DotPulsar/Internal/Constants.cs | 4 +++
src/DotPulsar/Internal/DotPulsarActivitySource.cs | 43 +++++++++++++++++------
src/DotPulsar/Internal/MessageProcessor.cs | 12 ++++---
src/DotPulsar/Internal/Producer.cs | 9 +++++
src/DotPulsar/Internal/ProducerBuilder.cs | 9 +++++
src/DotPulsar/ProcessingOptions.cs | 23 ++++++++----
src/DotPulsar/ProducerOptions.cs | 6 ++++
9 files changed, 98 insertions(+), 23 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f808077..afe645e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,7 +8,15 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/)
### Added
-- The 'Process' extension method for IConsumer\<TMessage\> can now be used for
parallel processing of messages
+- The 'Process' extension method for IConsumer\<TMessage\> can now be used for
parallel processing of messages. You can control:
+ - Whether ordered acknowledgment should be enforced
+ - The maximum number of messages that may be processed concurrently
+ - The maximum number of messages that may be processed per task
+ - The TaskScheduler to use for scheduling tasks
+- The trace created when sending a message can now be automatically linked to
by traces created when consuming the message. All you have to do is:
+ - Make sure 'messaging.trace_parent' and 'messaging.trace_state' are not
already in use in the message's metadata (properties)
+ - Set 'AttachTraceInfoToMessages' on ProducerOptions or IProducerBuilder
when creating a producer
+ - Set 'LinkTraces' on the ProcessingOptions passed to the 'Process'
extension method for IConsumer\<TMessage\>
## [2.4.1] - 2022-09-16
diff --git a/src/DotPulsar/Abstractions/IProducerBuilder.cs
b/src/DotPulsar/Abstractions/IProducerBuilder.cs
index 6c27c80..a858af1 100644
--- a/src/DotPulsar/Abstractions/IProducerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IProducerBuilder.cs
@@ -19,6 +19,11 @@ namespace DotPulsar.Abstractions;
/// </summary>
public interface IProducerBuilder<TMessage>
{
+ /// <summary>
+ /// Whether to attach the sending trace's parent and state to the outgoing
messages metadata. The default is 'false'.
+ /// </summary>
+ IProducerBuilder<TMessage> AttachTraceInfoToMessages(bool
attachTraceInfoToMessages);
+
/// <summary>
/// Set the compression type. The default is 'None'.
/// </summary>
diff --git a/src/DotPulsar/Internal/Constants.cs
b/src/DotPulsar/Internal/Constants.cs
index 8e1b117..f579818 100644
--- a/src/DotPulsar/Internal/Constants.cs
+++ b/src/DotPulsar/Internal/Constants.cs
@@ -41,6 +41,8 @@ public static class Constants
MetadataSizeOffset = 6;
MetadataOffset = 10;
ConversationId = "messaging.conversation_id";
+ TraceParent = "messaging.trace_parent";
+ TraceState = "messaging.trace_state";
TimestampToTicks = TimeSpan.TicksPerSecond / (double)
Stopwatch.Frequency;
}
@@ -55,5 +57,7 @@ public static class Constants
public static int MetadataSizeOffset { get; }
public static int MetadataOffset { get; }
public static string ConversationId { get; }
+ public static string TraceParent { get; }
+ public static string TraceState { get; }
public static double TimestampToTicks { get; }
}
diff --git a/src/DotPulsar/Internal/DotPulsarActivitySource.cs
b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
index 4e24458..114e215 100644
--- a/src/DotPulsar/Internal/DotPulsarActivitySource.cs
+++ b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
@@ -29,12 +29,21 @@ public static class DotPulsarActivitySource
public static ActivitySource ActivitySource { get; }
- public static Activity? StartConsumerActivity(IMessage message, string
operationName, KeyValuePair<string, object?>[] tags)
+ public static Activity? StartConsumerActivity(IMessage message, string
operationName, KeyValuePair<string, object?>[] tags, bool autoLinkTraces)
{
if (!ActivitySource.HasListeners())
return null;
- return StartActivity(operationName, ActivityKind.Consumer, tags,
message.GetConversationId());
+ IEnumerable<ActivityLink>? activityLinks = null;
+
+ if (autoLinkTraces)
+ {
+ var activityLink = GetActivityLink(message);
+ if (activityLink is not null)
+ activityLinks = new ActivityLink[] { activityLink.Value };
+ }
+
+ return StartActivity(operationName, ActivityKind.Consumer, tags,
activityLinks, message.GetConversationId());
}
public static Activity? StartProducerActivity(MessageMetadata metadata,
string operationName, KeyValuePair<string, object?>[] tags)
@@ -42,21 +51,33 @@ public static class DotPulsarActivitySource
if (!ActivitySource.HasListeners())
return null;
- return StartActivity(operationName, ActivityKind.Producer, tags,
metadata.GetConversationId());
+ return StartActivity(operationName, ActivityKind.Producer, tags, null,
metadata.GetConversationId());
+ }
+
+ private static ActivityLink? GetActivityLink(IMessage message)
+ {
+ if (message.Properties.TryGetValue(Constants.TraceParent, out var
traceParent))
+ {
+ _ = message.Properties.TryGetValue(Constants.TraceState, out var
traceState);
+
+ if (ActivityContext.TryParse(traceParent, traceState, out var
context))
+ return new ActivityLink(context);
+ }
+
+ return null;
}
- private static Activity? StartActivity(string operationName, ActivityKind
kind, KeyValuePair<string, object?>[] tags, string? conversationId)
+ private static Activity? StartActivity(
+ string operationName,
+ ActivityKind kind,
+ KeyValuePair<string, object?>[] tags,
+ IEnumerable<ActivityLink>? activityLinks,
+ string? conversationId)
{
- var activity = ActivitySource.StartActivity(operationName, kind);
+ var activity = ActivitySource.StartActivity(kind, name: operationName,
tags: tags, links: activityLinks);
if (activity is not null && activity.IsAllDataRequested)
{
- for (var i = 0; i < tags.Length; ++i)
- {
- var tag = tags[i];
- activity.SetTag(tag.Key, tag.Value);
- }
-
if (conversationId is not null)
activity.SetConversationId(conversationId);
}
diff --git a/src/DotPulsar/Internal/MessageProcessor.cs
b/src/DotPulsar/Internal/MessageProcessor.cs
index c3355f4..db6ece6 100644
--- a/src/DotPulsar/Internal/MessageProcessor.cs
+++ b/src/DotPulsar/Internal/MessageProcessor.cs
@@ -36,7 +36,8 @@ public sealed class MessageProcessor<TMessage> : IDisposable
private readonly SemaphoreSlim _receiveLock;
private readonly SemaphoreSlim _acknowledgeLock;
private readonly ObjectPool<ProcessInfo> _processInfoPool;
- private readonly bool _ensureOrderedAcknowledgement;
+ private readonly bool _linkTraces;
+ private readonly bool _ensureOrderedAcknowledgment;
private readonly int _maxDegreeOfParallelism;
private readonly int _maxMessagesPerTask;
private readonly TaskScheduler _taskScheduler;
@@ -70,7 +71,8 @@ public sealed class MessageProcessor<TMessage> : IDisposable
_acknowledgeLock = new SemaphoreSlim(1, 1);
_processInfoPool = new DefaultObjectPool<ProcessInfo>(new
DefaultPooledObjectPolicy<ProcessInfo>());
- _ensureOrderedAcknowledgement = options.EnsureOrderedAcknowledgement;
+ _linkTraces = options.LinkTraces;
+ _ensureOrderedAcknowledgment = options.EnsureOrderedAcknowledgment;
_maxDegreeOfParallelism = options.MaxDegreeOfParallelism;
_maxMessagesPerTask = options.MaxMessagesPerTask;
_taskScheduler = options.TaskScheduler;
@@ -103,7 +105,7 @@ public sealed class MessageProcessor<TMessage> : IDisposable
var processInfo = new ProcessInfo();
- var needToEnsureOrderedAcknowledgement = _ensureOrderedAcknowledgement
&& _maxDegreeOfParallelism > 1;
+ var needToEnsureOrderedAcknowledgement = _ensureOrderedAcknowledgment
&& _maxDegreeOfParallelism > 1;
var isUnbounded = _maxMessagesPerTask == ProcessingOptions.Unbounded;
while (!cancellationToken.IsCancellationRequested)
@@ -124,7 +126,7 @@ public sealed class MessageProcessor<TMessage> : IDisposable
_receiveLock.Release();
}
- var activity =
DotPulsarActivitySource.StartConsumerActivity(message, _operationName,
_activityTags);
+ var activity =
DotPulsarActivitySource.StartConsumerActivity(message, _operationName,
_activityTags, _linkTraces);
if (activity is not null && activity.IsAllDataRequested)
{
activity.SetMessageId(message.MessageId);
@@ -191,7 +193,7 @@ public sealed class MessageProcessor<TMessage> : IDisposable
var processorTask = Task.Factory.StartNew(
async () => await
Processor(cancellationToken).ConfigureAwait(false),
cancellationToken,
- TaskCreationOptions.None,
+ TaskCreationOptions.DenyChildAttach,
_taskScheduler).Unwrap();
_processorTasks.AddLast(processorTask);
diff --git a/src/DotPulsar/Internal/Producer.cs
b/src/DotPulsar/Internal/Producer.cs
index ba9fc0d..ba6567d 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -32,6 +32,7 @@ public sealed class Producer<TMessage> : IProducer<TMessage>,
IRegisterEvent
private readonly string _operationName;
private readonly KeyValuePair<string, object?>[] _activityTags;
private readonly KeyValuePair<string, object?>[] _meterTags;
+ private readonly bool _attachTraceInfoToMessages;
private readonly SequenceId _sequenceId;
private readonly StateManager<ProducerState> _state;
private readonly IConnectionPool _connectionPool;
@@ -70,6 +71,7 @@ public sealed class Producer<TMessage> : IProducer<TMessage>,
IRegisterEvent
{
new KeyValuePair<string, object?>("topic", options.Topic)
};
+ _attachTraceInfoToMessages = options.AttachTraceInfoToMessages;
_sequenceId = new SequenceId(options.InitialSequenceId);
_state = new StateManager<ProducerState>(ProducerState.Disconnected,
ProducerState.Closed, ProducerState.Faulted);
ServiceUrl = serviceUrl;
@@ -245,6 +247,13 @@ public sealed class Producer<TMessage> :
IProducer<TMessage>, IRegisterEvent
metadata.SequenceId = _sequenceId.FetchNext();
var activity = DotPulsarActivitySource.StartProducerActivity(metadata,
_operationName, _activityTags);
+ if (activity is not null && _attachTraceInfoToMessages)
+ {
+ metadata[Constants.TraceParent] = activity.Id;
+ if (activity.TraceStateString is not null)
+ metadata[Constants.TraceState] = activity.TraceStateString;
+ }
+
var startTimestamp = DotPulsarMeter.MessageSentEnabled ?
Stopwatch.GetTimestamp() : 0;
try
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs
b/src/DotPulsar/Internal/ProducerBuilder.cs
index 12859f4..c449319 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -22,6 +22,7 @@ public sealed class ProducerBuilder<TMessage> :
IProducerBuilder<TMessage>
private readonly IPulsarClient _pulsarClient;
private readonly ISchema<TMessage> _schema;
private string? _producerName;
+ private bool _attachTraceInfoToMessages;
private CompressionType _compressionType;
private ulong _initialSequenceId;
private string? _topic;
@@ -32,10 +33,17 @@ public sealed class ProducerBuilder<TMessage> :
IProducerBuilder<TMessage>
{
_pulsarClient = pulsarClient;
_schema = schema;
+ _attachTraceInfoToMessages = false;
_compressionType = ProducerOptions<TMessage>.DefaultCompressionType;
_initialSequenceId =
ProducerOptions<TMessage>.DefaultInitialSequenceId;
}
+ public IProducerBuilder<TMessage> AttachTraceInfoToMessages(bool
attachTraceInfoToMessages)
+ {
+ _attachTraceInfoToMessages = attachTraceInfoToMessages;
+ return this;
+ }
+
public IProducerBuilder<TMessage> CompressionType(CompressionType
compressionType)
{
_compressionType = compressionType;
@@ -79,6 +87,7 @@ public sealed class ProducerBuilder<TMessage> :
IProducerBuilder<TMessage>
var options = new ProducerOptions<TMessage>(_topic!, _schema)
{
+ AttachTraceInfoToMessages = _attachTraceInfoToMessages,
CompressionType = _compressionType,
InitialSequenceId = _initialSequenceId,
ProducerName = _producerName,
diff --git a/src/DotPulsar/ProcessingOptions.cs
b/src/DotPulsar/ProcessingOptions.cs
index d01a77c..7bcca96 100644
--- a/src/DotPulsar/ProcessingOptions.cs
+++ b/src/DotPulsar/ProcessingOptions.cs
@@ -27,7 +27,8 @@ public sealed class ProcessingOptions
/// </summary>
public const int Unbounded = -1;
- private bool _ensureOrderedAcknowledgement;
+ private bool _ensureOrderedAcknowledgment;
+ private bool _linkTraces;
private int _maxDegreeOfParallelism;
private int _maxMessagesPerTask;
private TaskScheduler _taskScheduler;
@@ -37,19 +38,29 @@ public sealed class ProcessingOptions
/// </summary>
public ProcessingOptions()
{
- _ensureOrderedAcknowledgement = true;
+ _ensureOrderedAcknowledgment = true;
+ _linkTraces = false;
_maxDegreeOfParallelism = 1;
_maxMessagesPerTask = Unbounded;
_taskScheduler = TaskScheduler.Default;
}
/// <summary>
- /// Whether ordered acknowledgement should be enforced. The default is
'true'.
+ /// Whether ordered acknowledgment should be enforced. The default is
'true'.
/// </summary>
- public bool EnsureOrderedAcknowledgement
+ public bool EnsureOrderedAcknowledgment
{
- get => _ensureOrderedAcknowledgement;
- set { _ensureOrderedAcknowledgement = value; }
+ get => _ensureOrderedAcknowledgment;
+ set { _ensureOrderedAcknowledgment = value; }
+ }
+
+ /// <summary>
+ /// Whether to link the process trace to the message's send trace, if
tracing is enabled. The default is 'false'.
+ /// </summary>
+ public bool LinkTraces
+ {
+ get => _linkTraces;
+ set { _linkTraces = value; }
}
/// <summary>
diff --git a/src/DotPulsar/ProducerOptions.cs b/src/DotPulsar/ProducerOptions.cs
index 665fc4d..181eb09 100644
--- a/src/DotPulsar/ProducerOptions.cs
+++ b/src/DotPulsar/ProducerOptions.cs
@@ -36,6 +36,7 @@ public sealed class ProducerOptions<TMessage>
/// </summary>
public ProducerOptions(string topic, ISchema<TMessage> schema)
{
+ AttachTraceInfoToMessages = false;
CompressionType = DefaultCompressionType;
InitialSequenceId = DefaultInitialSequenceId;
Topic = topic;
@@ -43,6 +44,11 @@ public sealed class ProducerOptions<TMessage>
MessageRouter = new RoundRobinPartitionRouter();
}
+ /// <summary>
+ /// Whether to attach the sending trace's parent and state to the outgoing
messages metadata. The default is 'false'.
+ /// </summary>
+ public bool AttachTraceInfoToMessages { get; set; }
+
/// <summary>
/// Set the compression type. The default is 'None'.
/// </summary>