This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
     new e0388dd  Created PIP 23: Message Tracing By Interceptors (markdown)
e0388dd is described below

commit e0388dd8484f562109c6a4e689983883d7c1beba
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Tue Aug 21 03:47:09 2018 -0700

    Created PIP 23: Message Tracing By Interceptors (markdown)
---
 PIP-23:-Message-Tracing-By-Interceptors.md | 239 +++++++++++++++++++++++++++++
 1 file changed, 239 insertions(+)

diff --git a/PIP-23:-Message-Tracing-By-Interceptors.md 
b/PIP-23:-Message-Tracing-By-Interceptors.md
new file mode 100644
index 0000000..d57d558
--- /dev/null
+++ b/PIP-23:-Message-Tracing-By-Interceptors.md
@@ -0,0 +1,239 @@
+- **Status**: Proposed
+- **Author**: [Penghui Li](https://github.com/codelipenghui)
+- **Pull Request**: -
+- **Mailing List discussion**: 
+
+## Requirement & Overview
+
+This is related to issue #2280. 
+And for Pulsar itself, That will be useful also to track performance issues 
and understand more of the latency breakdowns. Tracing is the easiest way to 
do: "show me a request with high-latency and show where the latency (for this 
particular request) was coming from.
+
+## GOALS
+
+A good approach to that, is to integrate a tracing standard, either 
OpenTracing or OpenCensus, at both bookkeeper and pulsar. so their applications 
can integrate with OpenTracing or OpenCensus as well,  then application's 
traceId, spanId can be passed all way from producer to broker to bookie and to 
consumer. Then those traces can be collected and fed into an ElasticSearch 
cluster for query.
+
+## IMPLEMENTATION
+
+Integration with a tracing system typically requires tight integration with 
applications. Especially if Pulsar is used in a data pipeline, Pulsar should be 
able to provide mechanism to be able to propagate trace context end-to-end. 
That says Pulsar doesn’t have to integrate with any tracing framework directly, 
but it has to provide mechanism for applications to do so. This can be done by:
+
+- Use `properties` in message header for propagating trace context.
+- Provide an interface to intercept message pipeline in Pulsar, which allows 
applications customize their own tracing behaviors and meet their requirements.
+
+This PIP introduces **Interceptor** to examine (and potentially modify) 
messages at key places during the lifecycle of a Pulsar message. The 
interceptors include `ProducerInterceptor` for intercepting messages at 
Producer side, and `ConsumerInterceptor` for intercepting messages at Consumer 
side.
+
+### Producer Interceptor
+
+The ProducerInterceptor intercept messages before sending them and on 
receiving acknowledgement from brokers. The interfaces for a 
ProducerInterceptor are defined as below:
+
+```java
+package org.apache.pulsar.client.api;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate) the 
+ * messages received by the producer before they are published to the Pulsar   
+ * brokers.
+ * <p>
+ * Exceptions thrown by ProducerInterceptor methods will be caught, logged, 
but  
+ * not propagated further.
+ * <p>
+ * ProducerInterceptor callbacks may be called from multiple threads. 
Interceptor  
+ * implementation must ensure thread-safety, if needed.
+ */
+public interface ProducerInterceptor<T> extends AutoCloseable {
+
+    /**
+     * Close the interceptor.
+     */
+    void close();
+
+    /**
+     * This is called from {@link Producer#send(Object)} and {@link  
+     * Producer#sendAsync(Object)} methods, before
+     * send the message to the brokers. This method is allowed to modify the 
+     * record, in which case, the new record
+     * will be returned.
+     * <p>
+     * Any exception thrown by this method will be caught by the caller and 
+     * logged, but not propagated further.
+     * <p>
+     * Since the producer may run multiple interceptors, a particular 
+     * interceptor's {@link #beforeSend(Message)} callback will be called in 
the   
+     * order specified by 
+     * {@link ProducerBuilder#intercept(ProducerInterceptor[])}.
+     * <p>
+     * The first interceptor in the list gets the message passed from the 
client, 
+     * the following interceptor will be passed the message returned by the   
+     * previous interceptor, and so on. Since interceptors are allowed to 
modify
+     * messages, interceptors may potentially get the message already modified 
by   
+     * other interceptors. However, building a pipeline of mutable 
interceptors   
+     * that depend on the output of the previous interceptor is discouraged, 
+     * because of potential side-effects caused by interceptors potentially   
+     * failing to modify the message and throwing an exception. If one of the 
+     * interceptors in the list throws an exception from   
+     * {@link#beforeSend(Message)}, the exception is caught, logged, and the 
next 
+     * interceptor is called with the message returned by the last successful  
 
+     * interceptor in the list, or otherwise the client.
+     *
+     * @param message message to send
+     * @return the intercepted message
+     */
+    Message<T> beforeSend(Message<T> message);
+
+    /**
+     * This method is called when the message sent to the broker has been 
+     * acknowledged, or when sending the message fails. 
+     * This method is generally called just before the user callback is 
+     * called, and in additional cases when an exception on the producer side.
+     * <p>
+     * Any exception thrown by this method will be ignored by the caller.
+     * <p>
+     * This method will generally execute in the background I/O thread, so the 
  
+     * implementation should be reasonably fast. Otherwise, sending of 
messages    
+     * from other threads could be delayed.
+     *
+     * @param message the message that application sends
+     * @param msgId the message id that assigned by the broker; null if send 
failed.
+     * @param cause the exception on sending messages, null indicates send has 
succeed.
+     */
+    void onSendAcknowledgement(Message<T> message, MessageId msgId, Throwable 
cause);
+
+}
+```
+
+### Consumer Interceptor
+
+The ConsumerInterceptor intercept messages before delivering to consumers and 
before sending acknowledgements to brokers. The interfaces for a 
ConsumerInterceptor are defined as below:
+
+```java
+package org.apache.pulsar.client.api;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate)
+ * messages received by the consumer.
+ *
+ * <p>A primary use case is to hook into consumer applications for custom
+ * monitoring, logging, etc.
+ *
+ * <p>Exceptions thrown by interceptor methods will be caught, logged, but
+ * not propagated further.
+ */
+public interface ConsumerInterceptor<T> extends AutoCloseable {
+
+    /**
+     * Close the interceptor.
+     */
+    void close();
+
+    /**
+     * This is called just before the message is returned by
+     * {@link Consumer#receive()}, {@link MessageListener#received(Consumer, 
+     * Message)} or the {@link java.util.concurrent.CompletableFuture} 
returned by
+     * {@link Consumer#receiveAsync()} completes.
+     * <p>
+     * This method is allowed to modify message, in which case the new message
+     * will be returned.
+     * <p>
+     * Any exception thrown by this method will be caught by the caller, 
logged,
+     * but not propagated to client.
+     * <p>
+     * Since the consumer may run multiple interceptors, a particular 
+     * interceptor's
+     * <tt>beforeConsume</tt> callback will be called in the order specified by
+     * {@link ConsumerBuilder#intercept(ConsumerInterceptor[])}. The first 
+     * interceptor in the list gets the consumed message, the following 
+     * interceptor will be passed
+     * the message returned by the previous interceptor, and so on. Since 
+     * interceptors are allowed to modify message, interceptors may 
potentially 
+     * get the messages already modified by other interceptors. However 
building a   
+     * pipeline of mutable
+     * interceptors that depend on the output of the previous interceptor is   
+     * discouraged, because of potential side-effects caused by interceptors 
+     * potentially failing to modify the message and throwing an exception. 
+     * if one of interceptors in the list throws an exception from 
+     * <tt>beforeConsume</tt>, the exception is caught, logged,
+     * and the next interceptor is called with the message returned by the 
last 
+     * successful interceptor in the list, or otherwise the original consumed 
+     * message.
+     *
+     * @param message the message to be consumed by the client.
+     * @return message that is either modified by the interceptor or same 
message
+     *         passed into the method.
+     */
+    Message<T> beforeConsume(Message<T> message);
+
+    /**
+     * This is called consumer sends the acknowledgment to the broker.
+     *
+     * <p>Any exception thrown by this method will be ignored by the caller.
+     *
+     * @param message message to ack, null if acknowledge fail.
+     * @param cause the exception on acknowledge.
+     */
+    void onAcknowledge(Message<T> message, Throwable cause);
+
+    /**
+     * This is called consumer send the cumulative acknowledgment to the 
broker.
+     *
+     * <p>Any exception thrown by this method will be ignored by the caller.
+     *
+     * @param messages messages to ack cumulative, null if acknowledge fail.
+     * @param cause the exception on acknowledge.
+     */
+    void onAcknowledgeCumulative(List<Message<T>> messages, Throwable cause);
+
+}
+```
+
+### Configure Interceptors
+
+Applications can configure interceptors via ProducerBuilder and 
ConsumerBuilder.
+
+```java
+/**
+ * Intercept {@link Producer}.
+ *
+ * @param interceptors the list of interceptors to intercept the producer 
created by this builder.
+ * @return producer builder.
+ */
+ProducerBuilder<T> intercept(ProducerInterceptor<T> ... interceptors);
+```
+
+```java
+/**
+ * Intercept {@link Consumer}.
+ *
+ * @param interceptors the list of interceptors to intercept the consumer 
created by this builder.
+ * @return consumer builder.
+ */
+ConsumerBuilder<T> intercept(ConsumerInterceptor<T>... interceptors);
+```
+
+### Tracing Interceptor
+
+Applications can choose its preferred tracing framework for integration. The 
tracing framework can be either OpenTracing or OpenCensus. Pulsar doesn’t 
enforce any tracing framework. 
+
+Following shows an example how end-to-end tracing using OpenTracing works 
using Interceptors in Pulsar.
+
+#### Trace Context
+
+In order to achieve end-to-end tracing, the most important thing is to be able 
to propagate trace context from Producer to Consumer. This trace context can be 
serialized and propagated into the properties in message header and carried as 
part of message.
+
+Use OpenTracing as an example:
+
+- The trace context carrier is the Message.
+- A trace context Format need to be defined how to serialize / deserialize 
trace context from the properties of this Message.
+
+#### Producer Tracing Interceptor
+
+A tracing interceptor is pretty straightforward:
+
+- In the implementation of `beforeSend` method, it should first try to 
deserialize a trace context (because the application who is using Pulsar 
producer might already set a trace context in the message header). If there is 
a trace context, then use the trace context, otherwise it should start a new 
trace span.
+- Once a trace context is established, it can record any information that 
needs to be traced.
+
+Similarly,
+
+- In the implementation of `onSendAcknowledgment`, it can retrieve the trace 
context and record any information that needs to be traced on acknowledgments.
+
+#### Consumer Tracing Interceptor
+
+Implementing a consumer tracing interceptor is similar as implementing a 
producer tracing interceptor as above.
\ No newline at end of file

Reply via email to