sijie closed pull request #2471: PIP-23: Pulsar Java Client Interceptors.
URL: https://github.com/apache/incubator-pulsar/pull/2471
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index ae1a4dba17..99aec9355b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -111,7 +111,7 @@ public String toString() {
                 consumerFuture,
                 SubscriptionMode.Durable,
                 MessageId.earliest,
-                Schema.BYTES);
+                Schema.BYTES, null);
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index f8ac40a945..8c175d7326 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -24,6 +24,7 @@
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.matches;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
@@ -36,7 +37,6 @@
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -80,7 +80,6 @@
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.Topic.PublishContext;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -1224,7 +1223,7 @@ public void testClosingReplicationProducerTwice() throws 
Exception {
         verify(clientImpl)
             .createProducerAsync(
                 any(ProducerConfigurationData.class),
-                any(Schema.class)
+                any(Schema.class), eq(null)
             );
 
         replicator.disconnect(false);
@@ -1235,7 +1234,7 @@ public void testClosingReplicationProducerTwice() throws 
Exception {
         verify(clientImpl, Mockito.times(2))
             .createProducerAsync(
                 any(ProducerConfigurationData.class),
-                any(Schema.class)
+                any(Schema.class), any(null)
             );
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index de4992594d..528ff64d7e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
@@ -242,7 +243,7 @@ public void testConcurrentReplicator() throws Exception {
         Thread.sleep(3000);
 
         Mockito.verify(pulsarClient, 
Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class),
-                Mockito.any(Schema.class));
+                Mockito.any(Schema.class), eq(null));
 
         client1.shutdown();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java
index 0d6d3036ef..92bb64175a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.v1;
 
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
@@ -241,7 +242,7 @@ public void testConcurrentReplicator() throws Exception {
         Thread.sleep(3000);
 
         Mockito.verify(pulsarClient, 
Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class),
-                Mockito.any(Schema.class));
+                Mockito.any(Schema.class), eq(null));
 
         client1.shutdown();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
new file mode 100644
index 0000000000..d83384698e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InterceptorsTest extends ProducerConsumerBase {
+
+    private static final Logger log = 
LoggerFactory.getLogger(InterceptorsTest.class);
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testProducerInterceptor() throws PulsarClientException {
+        ProducerInterceptor<String> interceptor1 = new 
ProducerInterceptor<String>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Message<String> beforeSend(Producer<String> producer, 
Message<String> message) {
+                MessageImpl<String> msg = (MessageImpl<String>) message;
+                log.info("Before send message: {}", new String(msg.getData()));
+                
java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> 
properties = msg.getMessageBuilder().getPropertiesList();
+                for (int i = 0; i < properties.size(); i++) {
+                    if ("key".equals(properties.get(i).getKey())) {
+                        msg.getMessageBuilder().setProperties(i, 
PulsarApi.KeyValue.newBuilder().setKey("key").setValue("after").build());
+                    }
+                }
+                return msg;
+            }
+
+            @Override
+            public void onSendAcknowledgement(Producer<String> producer, 
Message<String> message, MessageId msgId, Throwable cause) {
+                message.getProperties();
+                Assert.assertEquals("complete", message.getProperty("key"));
+                log.info("Send acknowledgement message: {}, msgId: {}", new 
String(message.getData()), msgId, cause);
+            }
+        };
+
+        ProducerInterceptor<String> interceptor2 = new 
ProducerInterceptor<String>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Message<String> beforeSend(Producer<String> producer, 
Message<String> message) {
+                MessageImpl<String> msg = (MessageImpl<String>) message;
+                log.info("Before send message: {}", new String(msg.getData()));
+                
java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> 
properties = msg.getMessageBuilder().getPropertiesList();
+                for (int i = 0; i < properties.size(); i++) {
+                    if ("key".equals(properties.get(i).getKey())) {
+                        msg.getMessageBuilder().setProperties(i, 
PulsarApi.KeyValue.newBuilder().setKey("key").setValue("complete").build());
+                    }
+                }
+                return msg;
+            }
+
+            @Override
+            public void onSendAcknowledgement(Producer<String> producer, 
Message<String> message, MessageId msgId, Throwable cause) {
+                message.getProperties();
+                Assert.assertEquals("complete", message.getProperty("key"));
+                log.info("Send acknowledgement message: {}, msgId: {}", new 
String(message.getData()), msgId, cause);
+            }
+        };
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .intercept(interceptor1, interceptor2)
+                .create();
+
+        MessageId messageId = producer.newMessage().property("key", 
"before").value("Hello Pulsar!").send();
+        log.info("Send result messageId: {}", messageId);
+        producer.close();
+    }
+
+    @Test
+    public void testConsumerInterceptorWithSingleTopicSubscribe() throws 
PulsarClientException {
+        ConsumerInterceptor<String> interceptor = new 
ConsumerInterceptor<String>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Message<String> beforeConsume(Consumer<String> consumer, 
Message<String> message) {
+                MessageImpl<String> msg = (MessageImpl<String>) message;
+                
msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
+                return msg;
+            }
+
+            @Override
+            public void onAcknowledge(Consumer<String> consumer, MessageId 
messageId, Throwable cause) {
+                log.info("onAcknowledge messageId: {}", messageId, cause);
+            }
+
+            @Override
+            public void onAcknowledgeCumulative(Consumer<String> consumer, 
MessageId messageId, Throwable cause) {
+                log.info("onAcknowledgeCumulative messageIds: {}", messageId, 
cause);
+            }
+        };
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .subscriptionType(SubscriptionType.Shared)
+                .intercept(interceptor)
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .create();
+
+        producer.newMessage().value("Hello Pulsar!").send();
+
+        Message<String> received = consumer.receive();
+        MessageImpl<String> msg = (MessageImpl<String>) received;
+        boolean haveKey = false;
+        for (PulsarApi.KeyValue keyValue : 
msg.getMessageBuilder().getPropertiesList()) {
+            if ("beforeConsumer".equals(keyValue.getKey())) {
+                haveKey = true;
+            }
+        }
+        Assert.assertTrue(haveKey);
+        consumer.acknowledge(received);
+        producer.close();
+        consumer.close();
+    }
+
+    @Test
+    public void testConsumerInterceptorWithMultiTopicSubscribe() throws 
PulsarClientException {
+
+        ConsumerInterceptor<String> interceptor = new 
ConsumerInterceptor<String>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Message<String> beforeConsume(Consumer<String> consumer, 
Message<String> message) {
+                MessageImpl<String> msg = (MessageImpl<String>) message;
+                
msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
+                return msg;
+            }
+
+            @Override
+            public void onAcknowledge(Consumer<String> consumer, MessageId 
messageId, Throwable cause) {
+                log.info("onAcknowledge messageId: {}", messageId, cause);
+            }
+
+            @Override
+            public void onAcknowledgeCumulative(Consumer<String> consumer, 
MessageId messageId, Throwable cause) {
+                log.info("onAcknowledgeCumulative messageIds: {}", messageId, 
cause);
+            }
+        };
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .create();
+
+        Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic1")
+                .create();
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic", 
"persistent://my-property/my-ns/my-topic1")
+                .subscriptionType(SubscriptionType.Shared)
+                .intercept(interceptor)
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        producer.newMessage().value("Hello Pulsar!").send();
+        producer1.newMessage().value("Hello Pulsar!").send();
+
+        int keyCount = 0;
+        for (int i = 0; i < 2; i++) {
+            Message<String> received = consumer.receive();
+            MessageImpl<String> msg = (MessageImpl<String>) 
((TopicMessageImpl<String>) received).getMessage();
+            for (PulsarApi.KeyValue keyValue : 
msg.getMessageBuilder().getPropertiesList()) {
+                if ("beforeConsumer".equals(keyValue.getKey())) {
+                    keyCount++;
+                }
+            }
+            consumer.acknowledge(received);
+        }
+        Assert.assertEquals(2, keyCount);
+        producer.close();
+        producer1.close();
+        consumer.close();
+    }
+
+    @Test
+    public void testConsumerInterceptorWithPatternTopicSubscribe() throws 
PulsarClientException {
+
+        ConsumerInterceptor<String> interceptor = new 
ConsumerInterceptor<String>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Message<String> beforeConsume(Consumer<String> consumer, 
Message<String> message) {
+                MessageImpl<String> msg = (MessageImpl<String>) message;
+                
msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
+                return msg;
+            }
+
+            @Override
+            public void onAcknowledge(Consumer<String> consumer, MessageId 
messageId, Throwable cause) {
+                log.info("onAcknowledge messageId: {}", messageId, cause);
+            }
+
+            @Override
+            public void onAcknowledgeCumulative(Consumer<String> consumer, 
MessageId messageId, Throwable cause) {
+                log.info("onAcknowledgeCumulative messageIds: {}", messageId, 
cause);
+            }
+        };
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .create();
+
+        Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic1")
+                .create();
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topicsPattern("persistent://my-property/my-ns/my-.*")
+                .subscriptionType(SubscriptionType.Shared)
+                .intercept(interceptor)
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        producer.newMessage().value("Hello Pulsar!").send();
+        producer1.newMessage().value("Hello Pulsar!").send();
+
+        int keyCount = 0;
+        for (int i = 0; i < 2; i++) {
+            Message<String> received = consumer.receive();
+            MessageImpl<String> msg = (MessageImpl<String>) 
((TopicMessageImpl<String>) received).getMessage();
+            for (PulsarApi.KeyValue keyValue : 
msg.getMessageBuilder().getPropertiesList()) {
+                if ("beforeConsumer".equals(keyValue.getKey())) {
+                    keyCount++;
+                }
+            }
+            consumer.acknowledge(received);
+        }
+        Assert.assertEquals(2, keyCount);
+        producer.close();
+        producer1.close();
+        consumer.close();
+    }
+
+    @Test
+    public void testConsumerInterceptorForAcknowledgeCumulative() throws 
PulsarClientException {
+
+        List<MessageId> ackHolder = new ArrayList<>();
+
+        ConsumerInterceptor<String> interceptor = new 
ConsumerInterceptor<String>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Message<String> beforeConsume(Consumer<String> consumer, 
Message<String> message) {
+                MessageImpl<String> msg = (MessageImpl<String>) message;
+                
msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
+                return msg;
+            }
+
+            @Override
+            public void onAcknowledge(Consumer<String> consumer, MessageId 
messageId, Throwable cause) {
+                log.info("onAcknowledge messageId: {}", messageId, cause);
+            }
+
+            @Override
+            public void onAcknowledgeCumulative(Consumer<String> consumer, 
MessageId messageId, Throwable cause) {
+                long acknowledged = ackHolder.stream().filter(m -> 
(m.compareTo(messageId) <= 0)).count();
+                Assert.assertEquals(acknowledged, 100);
+                ackHolder.clear();
+                log.info("onAcknowledgeCumulative messageIds: {}", messageId, 
cause);
+            }
+        };
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .subscriptionType(SubscriptionType.Failover)
+                .intercept(interceptor)
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .create();
+
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().value("Hello Pulsar!").send();
+        }
+
+        int keyCount = 0;
+        for (int i = 0; i < 100; i++) {
+            Message<String> received = consumer.receive();
+            MessageImpl<String> msg = (MessageImpl<String>) received;
+            for (PulsarApi.KeyValue keyValue : 
msg.getMessageBuilder().getPropertiesList()) {
+                if ("beforeConsumer".equals(keyValue.getKey())) {
+                    keyCount++;
+                }
+            }
+            ackHolder.add(received.getMessageId());
+            if (i == 99) {
+                consumer.acknowledgeCumulative(received);
+            }
+        }
+        Assert.assertEquals(100, keyCount);
+        producer.close();
+        consumer.close();
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 69d885fb78..f5a926602d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -288,4 +288,10 @@
      * @return Whether the consumer is connected to the broker
      */
     boolean isConnected();
+
+    /**
+     * Get the name of consumer.
+     * @return consumer name.
+     */
+    String getConsumerName();
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 8657859103..da1cbb5d17 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -330,4 +330,12 @@
      * Set subscriptionInitialPosition for the consumer
     */
     ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition 
subscriptionInitialPosition);
+
+    /**
+     * Intercept {@link Consumer}.
+     *
+     * @param interceptors the list of interceptors to intercept the consumer 
created by this builder.
+     * @return consumer builder.
+     */
+    ConsumerBuilder<T> intercept(ConsumerInterceptor<T> ...interceptors);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
new file mode 100644
index 0000000000..1134d8a2b4
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.List;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate)
+ * messages received by the consumer.
+ * <p>
+ * A primary use case is to hook into consumer applications for custom
+ * monitoring, logging, etc.
+ * <p>
+ * Exceptions thrown by interceptor methods will be caught, logged, but
+ * not propagated further.
+ */
+public interface ConsumerInterceptor<T> extends AutoCloseable {
+
+    /**
+     * Close the interceptor.
+     */
+    void close();
+
+    /**
+     * This is called just before the message is returned by
+     * {@link Consumer#receive()}, {@link MessageListener#received(Consumer,
+     * Message)} or the {@link java.util.concurrent.CompletableFuture} 
returned by
+     * {@link Consumer#receiveAsync()} completes.
+     * <p>
+     * This method is allowed to modify message, in which case the new message
+     * will be returned.
+     * <p>
+     * Any exception thrown by this method will be caught by the caller, 
logged,
+     * but not propagated to client.
+     * <p>
+     * Since the consumer may run multiple interceptors, a particular
+     * interceptor's
+     * <tt>beforeConsume</tt> callback will be called in the order specified by
+     * {@link ConsumerBuilder#intercept(ConsumerInterceptor[])}. The first
+     * interceptor in the list gets the consumed message, the following
+     * interceptor will be passed
+     * the message returned by the previous interceptor, and so on. Since
+     * interceptors are allowed to modify message, interceptors may potentially
+     * get the messages already modified by other interceptors. However 
building a
+     * pipeline of mutable
+     * interceptors that depend on the output of the previous interceptor is
+     * discouraged, because of potential side-effects caused by interceptors
+     * potentially failing to modify the message and throwing an exception.
+     * if one of interceptors in the list throws an exception from
+     * <tt>beforeConsume</tt>, the exception is caught, logged,
+     * and the next interceptor is called with the message returned by the last
+     * successful interceptor in the list, or otherwise the original consumed
+     * message.
+     *
+     * @param consumer the consumer which contains the interceptor
+     * @param message the message to be consumed by the client.
+     * @return message that is either modified by the interceptor or same 
message
+     *         passed into the method.
+     */
+    Message<T> beforeConsume(Consumer<T> consumer, Message<T> message);
+
+    /**
+     * This is called consumer sends the acknowledgment to the broker.
+     *
+     * <p>Any exception thrown by this method will be ignored by the caller.
+     *
+     * @param consumer the consumer which contains the interceptor
+     * @param messageId message to ack, null if acknowledge fail.
+     * @param exception the exception on acknowledge.
+     */
+    void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable 
exception);
+
+    /**
+     * This is called consumer send the cumulative acknowledgment to the 
broker.
+     *
+     * <p>Any exception thrown by this method will be ignored by the caller.
+     *
+     * @param consumer the consumer which contains the interceptor
+     * @param messageId message to ack, null if acknowledge fail.
+     * @param exception the exception on acknowledge.
+     */
+    void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, 
Throwable exception);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 8256b4a282..b3aa720902 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -318,4 +318,12 @@
      * @return
      */
     ProducerBuilder<T> properties(Map<String, String> properties);
+
+    /**
+     * Intercept {@link Producer}.
+     *
+     * @param interceptors the list of interceptors to intercept the producer 
created by this builder.
+     * @return producer builder.
+     */
+    ProducerBuilder<T> intercept(ProducerInterceptor<T> ... interceptors);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerInterceptor.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerInterceptor.java
new file mode 100644
index 0000000000..b6a0d77558
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerInterceptor.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate) the
+ * messages received by the producer before they are published to the Pulsar
+ * brokers.
+ * <p>
+ * Exceptions thrown by ProducerInterceptor methods will be caught, logged, but
+ * not propagated further.
+ * <p>
+ * ProducerInterceptor callbacks may be called from multiple threads. 
Interceptor
+ * implementation must ensure thread-safety, if needed.
+ */
+public interface ProducerInterceptor<T> extends AutoCloseable {
+
+    /**
+     * Close the interceptor.
+     */
+    void close();
+
+    /**
+     * This is called from {@link Producer#send(Object)} and {@link
+     * Producer#sendAsync(Object)} methods, before
+     * send the message to the brokers. This method is allowed to modify the
+     * record, in which case, the new record
+     * will be returned.
+     * <p>
+     * Any exception thrown by this method will be caught by the caller and
+     * logged, but not propagated further.
+     * <p>
+     * Since the producer may run multiple interceptors, a particular
+     * interceptor's {@link #beforeSend(Producer, Message)} callback will be 
called in the
+     * order specified by
+     * {@link ProducerBuilder#intercept(ProducerInterceptor[])}.
+     * <p>
+     * The first interceptor in the list gets the message passed from the 
client,
+     * the following interceptor will be passed the message returned by the
+     * previous interceptor, and so on. Since interceptors are allowed to 
modify
+     * messages, interceptors may potentially get the message already modified 
by
+     * other interceptors. However, building a pipeline of mutable interceptors
+     * that depend on the output of the previous interceptor is discouraged,
+     * because of potential side-effects caused by interceptors potentially
+     * failing to modify the message and throwing an exception. If one of the
+     * interceptors in the list throws an exception from
+     * {@link#beforeSend(Message)}, the exception is caught, logged, and the 
next
+     * interceptor is called with the message returned by the last successful
+     * interceptor in the list, or otherwise the client.
+     *
+     * @param producer the producer which contains the interceptor.
+     * @param message message to send
+     * @return the intercepted message
+     */
+    Message<T> beforeSend(Producer<T> producer, Message<T> message);
+
+    /**
+     * This method is called when the message sent to the broker has been
+     * acknowledged, or when sending the message fails.
+     * This method is generally called just before the user callback is
+     * called, and in additional cases when an exception on the producer side.
+     * <p>
+     * Any exception thrown by this method will be ignored by the caller.
+     * <p>
+     * This method will generally execute in the background I/O thread, so the
+     * implementation should be reasonably fast. Otherwise, sending of messages
+     * from other threads could be delayed.
+     *
+     * @param producer the producer which contains the interceptor.
+     * @param message the message that application sends
+     * @param msgId the message id that assigned by the broker; null if send 
failed.
+     * @param exception the exception on sending messages, null indicates send 
has succeed.
+     */
+    void onSendAcknowledgement(Producer<T> producer, Message<T> message, 
MessageId msgId, Throwable exception);
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index a6fda6f8b6..d8a0de3b9a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -60,10 +60,11 @@
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> 
pendingReceives;
     protected int maxReceiverQueueSize;
     protected Schema<T> schema;
+    protected final ConsumerInterceptors<T> interceptors;
 
     protected ConsumerBase(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
                            int receiverQueueSize, ExecutorService 
listenerExecutor,
-                           CompletableFuture<Consumer<T>> subscribeFuture, 
Schema<T> schema) {
+                           CompletableFuture<Consumer<T>> subscribeFuture, 
Schema<T> schema, ConsumerInterceptors interceptors) {
         super(client, topic);
         this.maxReceiverQueueSize = receiverQueueSize;
         this.subscription = conf.getSubscriptionName();
@@ -81,6 +82,7 @@ protected ConsumerBase(PulsarClientImpl client, String topic, 
ConsumerConfigurat
         this.listenerExecutor = listenerExecutor;
         this.pendingReceives = Queues.newConcurrentLinkedQueue();
         this.schema = schema;
+        this.interceptors = interceptors;
     }
 
     @Override
@@ -335,6 +337,7 @@ public String getSubscription() {
         return subscription;
     }
 
+    @Override
     public String getConsumerName() {
         return this.consumerName;
     }
@@ -360,4 +363,24 @@ protected void setMaxReceiverQueueSize(int newSize) {
         this.maxReceiverQueueSize = newSize;
     }
 
+    protected Message<T> beforeConsume(Message<T> message) {
+        if (interceptors != null) {
+            return interceptors.beforeConsume(this, message);
+        } else {
+            return message;
+        }
+    }
+
+    protected void onAcknowledge(MessageId messageId, Throwable exception) {
+        if (interceptors != null) {
+            interceptors.onAcknowledge(this, messageId, exception);
+        }
+    }
+
+    protected void onAcknowledgeCumulative(MessageId messageId, Throwable 
exception) {
+        if (interceptors != null) {
+            interceptors.onAcknowledgeCumulative(this, messageId, exception);
+        }
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index f0067f7f10..2095babbb1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -20,6 +20,8 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -32,6 +34,7 @@
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.ConsumerInterceptor;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -52,6 +55,7 @@
     private final PulsarClientImpl client;
     private ConsumerConfigurationData<T> conf;
     private final Schema<T> schema;
+    private List<ConsumerInterceptor<T>> interceptorList;
 
     private static long MIN_ACK_TIMEOUT_MILLIS = 1000;
 
@@ -104,8 +108,9 @@
             return FutureUtil.failedFuture(
                     new InvalidConfigurationException("Subscription name must 
be set on the consumer builder"));
         }
-
-        return client.subscribeAsync(conf, schema);
+        return interceptorList == null || interceptorList.size() == 0 ?
+                client.subscribeAsync(conf, schema, null) :
+                client.subscribeAsync(conf, schema, new 
ConsumerInterceptors<>(interceptorList));
     }
 
     @Override
@@ -242,7 +247,16 @@
                return this;
        }
 
-       public ConsumerConfigurationData<T> getConf() {
+    @Override
+    public ConsumerBuilder<T> intercept(ConsumerInterceptor<T>... 
interceptors) {
+        if (interceptorList == null) {
+            interceptorList = new ArrayList<>();
+        }
+        interceptorList.addAll(Arrays.asList(interceptors));
+        return this;
+    }
+
+    public ConsumerConfigurationData<T> getConf() {
            return conf;
        }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index fe37b69a50..a0a231902f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -143,14 +143,14 @@
     }
 
     ConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
-            ExecutorService listenerExecutor, int partitionIndex, 
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
-        this(client, topic, conf, listenerExecutor, partitionIndex, 
subscribeFuture, SubscriptionMode.Durable, null, schema);
+            ExecutorService listenerExecutor, int partitionIndex, 
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, 
ConsumerInterceptors interceptors) {
+        this(client, topic, conf, listenerExecutor, partitionIndex, 
subscribeFuture, SubscriptionMode.Durable, null, schema, interceptors);
     }
 
     ConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
                  ExecutorService listenerExecutor, int partitionIndex, 
CompletableFuture<Consumer<T>> subscribeFuture,
-                 SubscriptionMode subscriptionMode, MessageId startMessageId, 
Schema<T> schema) {
-        super(client, topic, conf, conf.getReceiverQueueSize(), 
listenerExecutor, subscribeFuture, schema);
+                 SubscriptionMode subscriptionMode, MessageId startMessageId, 
Schema<T> schema, ConsumerInterceptors interceptors) {
+        super(client, topic, conf, conf.getReceiverQueueSize(), 
listenerExecutor, subscribeFuture, schema, interceptors);
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = subscriptionMode;
         this.startMessageId = startMessageId != null ? new 
BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
@@ -263,8 +263,9 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
         Message<T> message;
         try {
             message = incomingMessages.take();
-            messageProcessed(message);
-            return message;
+            Message<T> interceptMsg = beforeConsume(message);
+            messageProcessed(interceptMsg);
+            return interceptMsg;
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             stats.incrementNumReceiveFailed();
@@ -293,8 +294,9 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
         if (message == null && conf.getReceiverQueueSize() == 0) {
             sendFlowPermitsToBroker(cnx(), 1);
         } else if (message != null) {
-            messageProcessed(message);
-            result.complete(message);
+            Message<T> interceptMsg = beforeConsume(message);
+            messageProcessed(interceptMsg);
+            result.complete(interceptMsg);
         }
 
         return result;
@@ -352,10 +354,11 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
         Message<T> message;
         try {
             message = incomingMessages.poll(timeout, unit);
-            if (message != null) {
-                messageProcessed(message);
+            Message<T> interceptMsg = beforeConsume(message);
+            if (interceptMsg != null) {
+                messageProcessed(interceptMsg);
             }
-            return message;
+            return interceptMsg;
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
@@ -410,7 +413,13 @@ boolean markAckForBatchMessage(BatchMessageIdImpl 
batchMessageId, AckType ackTyp
         checkArgument(messageId instanceof MessageIdImpl);
         if (getState() != State.Ready && getState() != State.Connecting) {
             stats.incrementNumAcksFailed();
-            return FutureUtil.failedFuture(new PulsarClientException("Consumer 
not ready. State: " + getState()));
+            PulsarClientException exception = new 
PulsarClientException("Consumer not ready. State: " + getState());
+            if (AckType.Individual.equals(ackType)) {
+                onAcknowledge(messageId, exception);
+            } else if (AckType.Cumulative.equals(ackType)) {
+                onAcknowledgeCumulative(messageId, exception);
+            }
+            return FutureUtil.failedFuture(exception);
         }
 
         if (messageId instanceof BatchMessageIdImpl) {
@@ -444,7 +453,9 @@ boolean markAckForBatchMessage(BatchMessageIdImpl 
batchMessageId, AckType ackTyp
                 unAckedMessageTracker.remove(msgId);
                 stats.incrementNumAcksSent(1);
             }
+            onAcknowledge(messageId, null);
         } else if (ackType == AckType.Cumulative) {
+            onAcknowledgeCumulative(messageId, null);
             
stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(msgId));
         }
 
@@ -837,9 +848,10 @@ void notifyPendingReceivedCallback(final Message<T> 
message, Exception exception
                         receivedFuture.complete(message);
                     } else {
                         // increase permits for available message-queue
-                        messageProcessed(message);
+                        Message<T> interceptMsg = beforeConsume(message);
+                        messageProcessed(interceptMsg);
                         // return message to receivedCallback
-                        listenerExecutor.execute(() -> 
receivedFuture.complete(message));
+                        listenerExecutor.execute(() -> 
receivedFuture.complete(interceptMsg));
                     }
                 }
             } else {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
new file mode 100644
index 0000000000..a0d30fcb96
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerInterceptor;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A container that hold the list {@link 
org.apache.pulsar.client.api.ConsumerInterceptor} and wraps calls to the chain
+ * of custom interceptors.
+ */
+public class ConsumerInterceptors<T> implements Closeable {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ConsumerInterceptors.class);
+
+    private final List<ConsumerInterceptor<T>> interceptors;
+
+    public ConsumerInterceptors(List<ConsumerInterceptor<T>> interceptors) {
+        this.interceptors = interceptors;
+    }
+
+    /**
+     * This is called just before the message is returned by {@link 
Consumer#receive()},
+     * {@link MessageListener#received(Consumer, Message)} or the {@link 
java.util.concurrent.CompletableFuture}
+     * returned by {@link Consumer#receiveAsync()} completes.
+     * <p>
+     * This method calls {@link ConsumerInterceptor#beforeConsume(Consumer, 
Message)} for each interceptor. Messages returned
+     * from each interceptor get passed to beforeConsume() of the next 
interceptor in the chain of interceptors.
+     * <p>
+     * This method does not throw exceptions. If any of the interceptors in 
the chain throws an exception, it gets
+     * caught and logged, and next interceptor in int the chain is called with 
'messages' returned by the previous
+     * successful interceptor beforeConsume call.
+     *
+     * @param consumer the consumer which contains the interceptors
+     * @param message message to be consume by the client.
+     * @return messages that are either modified by interceptors or same as 
messages passed to this method.
+     */
+    public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
+        Message<T> interceptorMessage = message;
+        for (int i = 0; i < interceptors.size(); i++) {
+            try {
+                interceptorMessage = 
interceptors.get(i).beforeConsume(consumer, interceptorMessage);
+            } catch (Exception e) {
+                if (consumer != null) {
+                    log.warn("Error executing interceptor beforeConsume 
callback topic: {} consumerName: {}", consumer.getTopic(), 
consumer.getConsumerName(), e);
+                } else {
+                    log.warn("Error executing interceptor beforeConsume 
callback", e);
+                }
+            }
+        }
+        return interceptorMessage;
+    }
+
+    /**
+     * This is called when acknowledge request return from the broker.
+     * <p>
+     * This method calls {@link ConsumerInterceptor#onAcknowledge(Consumer, 
MessageId, Throwable)} method for each interceptor.
+     * <p>
+     * This method does not throw exceptions. Exceptions thrown by any of 
interceptors in the chain are logged, but not propagated.
+     *
+     * @param consumer the consumer which contains the interceptors
+     * @param messageId message to acknowledge.
+     * @param exception exception returned by broker.
+     */
+    public void onAcknowledge(Consumer<T> consumer, MessageId messageId, 
Throwable exception) {
+        for (int i = 0; i < interceptors.size(); i++) {
+            try {
+                interceptors.get(i).onAcknowledge(consumer, messageId, 
exception);
+            } catch (Exception e) {
+                log.warn("Error executing interceptor onAcknowledge callback 
", e);
+            }
+        }
+    }
+
+    /**
+     * This is called when acknowledge cumulative request return from the 
broker.
+     * <p>
+     * This method calls {@link 
ConsumerInterceptor#onAcknowledgeCumulative(Consumer, MessageId, Throwable)} 
(Message, Throwable)} method for each interceptor.
+     * <p>
+     * This method does not throw exceptions. Exceptions thrown by any of 
interceptors in the chain are logged, but not propagated.
+     *
+     * @param consumer the consumer which contains the interceptors
+     * @param messageId messages to acknowledge.
+     * @param exception exception returned by broker.
+     */
+    public void onAcknowledgeCumulative(Consumer<T> consumer, MessageId 
messageId, Throwable exception) {
+        for (int i = 0; i < interceptors.size(); i++) {
+            try {
+                interceptors.get(i).onAcknowledgeCumulative(consumer, 
messageId, exception);
+            } catch (Exception e) {
+                log.warn("Error executing interceptor onAcknowledgeCumulative 
callback ", e);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        for (int i = 0; i < interceptors.size(); i++) {
+            try {
+                interceptors.get(i).close();
+            } catch (Exception e) {
+                log.error("Fail to close consumer interceptor ", e);
+            }
+        }
+    }
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 96b68c1af4..97e2247f8d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -241,7 +241,7 @@ public String getProducerName() {
         return null;
     }
 
-    ByteBuf getDataBuffer() {
+    public ByteBuf getDataBuffer() {
         return payload;
     }
 
@@ -274,7 +274,7 @@ public String getProperty(String name) {
         return properties.get(name);
     }
 
-    MessageMetadata.Builder getMessageBuilder() {
+    public MessageMetadata.Builder getMessageBuilder() {
         return msgMetadataBuilder;
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index f1cb9cf262..75fdac681a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -85,9 +85,9 @@
     private final ConsumerConfigurationData<T> internalConfig;
 
     MultiTopicsConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor,
-                            CompletableFuture<Consumer<T>> subscribeFuture, 
Schema<T> schema) {
+                            CompletableFuture<Consumer<T>> subscribeFuture, 
Schema<T> schema, ConsumerInterceptors<T> interceptors) {
         super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), conf,
-                Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, 
subscribeFuture, schema);
+                Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, 
subscribeFuture, schema, interceptors);
 
         checkArgument(conf.getReceiverQueueSize() > 0,
             "Receiver queue size needs to be greater than 0 for Topics 
Consumer");
@@ -632,7 +632,7 @@ private boolean topicNameValid(String topicName) {
                                                                            
ExecutorService listenerExecutor,
                                                                            
CompletableFuture<Consumer<T>> subscribeFuture,
                                                                            int 
numPartitions,
-                                                                           
Schema<T> schema) {
+                                                                           
Schema<T> schema, ConsumerInterceptors<T> interceptors) {
         checkArgument(conf.getTopicNames().size() == 1, "Should have only 1 
topic for partitioned consumer");
 
         // get topic name, then remove it from conf, so constructor will 
create a consumer with no topic.
@@ -641,7 +641,7 @@ private boolean topicNameValid(String topicName) {
         cloneConf.getTopicNames().remove(topicName);
 
         CompletableFuture<Consumer> future = new CompletableFuture<>();
-        MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, 
cloneConf, listenerExecutor, future, schema);
+        MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, 
cloneConf, listenerExecutor, future, schema, interceptors);
 
         future.thenCompose(c -> 
((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
             .thenRun(()-> subscribeFuture.complete(consumer))
@@ -695,7 +695,7 @@ private void 
subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, S
                         String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
                         ConsumerImpl<T> newConsumer = new 
ConsumerImpl<>(client, partitionName, configurationData,
-                            client.externalExecutorProvider().getExecutor(), 
partitionIndex, subFuture, schema);
+                            client.externalExecutorProvider().getExecutor(), 
partitionIndex, subFuture, schema, interceptors);
                         consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
                         return subFuture;
                     })
@@ -706,7 +706,7 @@ private void 
subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, S
 
             CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
             ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, 
topicName, internalConfig,
-                client.externalExecutorProvider().getExecutor(), 0, subFuture, 
schema);
+                client.externalExecutorProvider().getExecutor(), 0, subFuture, 
schema, interceptors);
             consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
 
             futureList = Collections.singletonList(subFuture);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 3e159e8f48..12ecf2b0f2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -53,8 +53,8 @@
     private final TopicMetadata topicMetadata;
 
     public PartitionedProducerImpl(PulsarClientImpl client, String topic, 
ProducerConfigurationData conf, int numPartitions,
-            CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> 
schema) {
-        super(client, topic, conf, producerCreatedFuture, schema);
+            CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> 
schema, ProducerInterceptors<T> interceptors) {
+        super(client, topic, conf, producerCreatedFuture, schema, 
interceptors);
         this.producers = Lists.newArrayListWithCapacity(numPartitions);
         this.topicMetadata = new TopicMetadataImpl(numPartitions);
         this.routerPolicy = getMessageRouter();
@@ -111,7 +111,7 @@ private void start() {
         for (int partitionIndex = 0; partitionIndex < 
topicMetadata.numPartitions(); partitionIndex++) {
             String partitionName = 
TopicName.get(topic).getPartition(partitionIndex).toString();
             ProducerImpl<T> producer = new ProducerImpl<>(client, 
partitionName, conf, new CompletableFuture<>(),
-                    partitionIndex, schema);
+                    partitionIndex, schema, interceptors);
             producers.add(producer);
             producer.producerCreatedFuture().handle((prod, createException) -> 
{
                 if (createException != null) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index d0b0c606c5..3f6dfecdb3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -51,8 +51,8 @@ public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
                                           ConsumerConfigurationData<T> conf,
                                           ExecutorService listenerExecutor,
                                           CompletableFuture<Consumer<T>> 
subscribeFuture,
-                                          Schema<T> schema) {
-        super(client, conf, listenerExecutor, subscribeFuture, schema);
+                                          Schema<T> schema, 
ConsumerInterceptors<T> interceptors) {
+        super(client, conf, listenerExecutor, subscribeFuture, schema, 
interceptors);
         this.topicsPattern = topicsPattern;
 
         if (this.namespaceName == null) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index eb02b6bd8f..39e632c97a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -36,13 +36,15 @@
     protected final CompletableFuture<Producer<T>> producerCreatedFuture;
     protected final ProducerConfigurationData conf;
     protected final Schema<T> schema;
+    protected final ProducerInterceptors<T> interceptors;
 
     protected ProducerBase(PulsarClientImpl client, String topic, 
ProducerConfigurationData conf,
-            CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> 
schema) {
+            CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> 
schema, ProducerInterceptors<T> interceptors) {
         super(client, topic);
         this.producerCreatedFuture = producerCreatedFuture;
         this.conf = conf;
         this.schema = schema;
+        this.interceptors = interceptors;
     }
 
     @Override
@@ -148,6 +150,20 @@ public ProducerConfigurationData getConfiguration() {
         return producerCreatedFuture;
     }
 
+    protected Message<T> beforeSend(Message<T> message) {
+        if (interceptors != null) {
+            return interceptors.beforeSend(this, message);
+        } else {
+            return message;
+        }
+    }
+
+    protected void onSendAcknowledgement(Message<T> message, MessageId msgId, 
Throwable exception) {
+        if (interceptors != null) {
+            interceptors.onSendAcknowledgement(this, message, msgId, 
exception);
+        }
+    }
+
     @Override
     public String toString() {
         return "ProducerBase{" + "topic='" + topic + '\'' + '}';
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index ff391be3ee..834e0c082f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -20,6 +20,9 @@
 
 import com.google.common.annotations.VisibleForTesting;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -33,6 +36,7 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.ProducerInterceptor;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
@@ -46,6 +50,7 @@
     private final PulsarClientImpl client;
     private ProducerConfigurationData conf;
     private Schema<T> schema;
+    private List<ProducerInterceptor<T>> interceptorList;
 
     @VisibleForTesting
     public ProducerBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
@@ -97,7 +102,9 @@ private ProducerBuilderImpl(PulsarClientImpl client, 
ProducerConfigurationData c
                     .failedFuture(new IllegalArgumentException("Topic name 
must be set on the producer builder"));
         }
 
-        return client.createProducerAsync(conf, schema);
+        return interceptorList == null || interceptorList.size() == 0 ?
+                client.createProducerAsync(conf, schema, null) :
+                client.createProducerAsync(conf, schema, new 
ProducerInterceptors<>(interceptorList));
     }
 
     @Override
@@ -226,4 +233,13 @@ private ProducerBuilderImpl(PulsarClientImpl client, 
ProducerConfigurationData c
         conf.getProperties().putAll(properties);
         return this;
     }
+
+    @Override
+    public ProducerBuilder<T> intercept(ProducerInterceptor<T>... 
interceptors) {
+        if (interceptorList == null) {
+            interceptorList = new ArrayList<>();
+        }
+        interceptorList.addAll(Arrays.asList(interceptors));
+        return this;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index c3428e2644..9401e7cd20 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -121,8 +121,9 @@
             .newUpdater(ProducerImpl.class, "msgIdGenerator");
 
     public ProducerImpl(PulsarClientImpl client, String topic, 
ProducerConfigurationData conf,
-                        CompletableFuture<Producer<T>> producerCreatedFuture, 
int partitionIndex, Schema<T> schema) {
-        super(client, topic, conf, producerCreatedFuture, schema);
+                        CompletableFuture<Producer<T>> producerCreatedFuture, 
int partitionIndex, Schema<T> schema,
+                        ProducerInterceptors<T> interceptors) {
+        super(client, topic, conf, producerCreatedFuture, schema, 
interceptors);
         this.producerId = client.newProducerId();
         this.producerName = conf.getProducerName();
         this.partitionIndex = partitionIndex;
@@ -205,9 +206,16 @@ public long getLastSequenceId() {
 
     @Override
     CompletableFuture<MessageId> internalSendAsync(Message<T> message) {
+
         CompletableFuture<MessageId> future = new CompletableFuture<>();
 
-        sendAsync(message, new SendCallback() {
+        MessageImpl<T> interceptorMessage = (MessageImpl<T>) 
beforeSend(message);
+        //Retain the buffer used by interceptors callback to get message. 
Buffer will release after complete interceptors.
+        interceptorMessage.getDataBuffer().retain();
+        if (interceptors != null) {
+            interceptorMessage.getProperties();
+        }
+        sendAsync(interceptorMessage, new SendCallback() {
             SendCallback nextCallback = null;
             MessageImpl<?> nextMsg = null;
             long createdAt = System.nanoTime();
@@ -229,25 +237,40 @@ public SendCallback getNextSendCallback() {
 
             @Override
             public void sendComplete(Exception e) {
-                if (e != null) {
-                    stats.incrementSendFailed();
-                    future.completeExceptionally(e);
-                } else {
-                    future.complete(message.getMessageId());
-                    stats.incrementNumAcksReceived(System.nanoTime() - 
createdAt);
-                }
-                while (nextCallback != null) {
-                    SendCallback sendCallback = nextCallback;
-                    MessageImpl<?> msg = nextMsg;
+                try {
                     if (e != null) {
                         stats.incrementSendFailed();
-                        sendCallback.getFuture().completeExceptionally(e);
+                        onSendAcknowledgement(interceptorMessage, null, e);
+                        future.completeExceptionally(e);
                     } else {
-                        sendCallback.getFuture().complete(msg.getMessageId());
+                        onSendAcknowledgement(interceptorMessage, 
interceptorMessage.getMessageId(), null);
+                        future.complete(interceptorMessage.getMessageId());
                         stats.incrementNumAcksReceived(System.nanoTime() - 
createdAt);
                     }
-                    nextMsg = nextCallback.getNextMessage();
-                    nextCallback = nextCallback.getNextSendCallback();
+                } finally {
+                    interceptorMessage.getDataBuffer().release();
+                }
+
+                while (nextCallback != null) {
+                    SendCallback sendCallback = nextCallback;
+                    MessageImpl<?> msg = nextMsg;
+                    //Retain the buffer used by interceptors callback to get 
message. Buffer will release after complete interceptors.
+                    try {
+                        msg.getDataBuffer().retain();
+                        if (e != null) {
+                            stats.incrementSendFailed();
+                            onSendAcknowledgement((Message<T>) msg, null, e);
+                            sendCallback.getFuture().completeExceptionally(e);
+                        } else {
+                            onSendAcknowledgement((Message<T>) msg, 
msg.getMessageId(), null);
+                            
sendCallback.getFuture().complete(msg.getMessageId());
+                            stats.incrementNumAcksReceived(System.nanoTime() - 
createdAt);
+                        }
+                        nextMsg = nextCallback.getNextMessage();
+                        nextCallback = nextCallback.getNextSendCallback();
+                    } finally {
+                        msg.getDataBuffer().release();
+                    }
                 }
             }
 
@@ -292,14 +315,18 @@ public void sendAsync(Message<T> message, SendCallback 
callback) {
             String compressedStr = (!isBatchMessagingEnabled() && 
conf.getCompressionType() != CompressionType.NONE)
                     ? "Compressed"
                     : "";
-            callback.sendComplete(new 
PulsarClientException.InvalidMessageException(
-                    format("%s Message payload size %d cannot exceed %d 
bytes", compressedStr, compressedSize,
-                            PulsarDecoder.MaxMessageSize)));
+            PulsarClientException.InvalidMessageException 
invalidMessageException =
+                    new PulsarClientException.InvalidMessageException(
+                            format("%s Message payload size %d cannot exceed 
%d bytes", compressedStr, compressedSize,
+                                    PulsarDecoder.MaxMessageSize));
+            callback.sendComplete(invalidMessageException);
             return;
         }
 
         if (!msg.isReplicated() && msgMetadataBuilder.hasProducerName()) {
-            callback.sendComplete(new 
PulsarClientException.InvalidMessageException("Cannot re-use the same 
message"));
+            PulsarClientException.InvalidMessageException 
invalidMessageException =
+                    new PulsarClientException.InvalidMessageException("Cannot 
re-use the same message");
+            callback.sendComplete(invalidMessageException);
             compressedPayload.release();
             return;
         }
@@ -463,8 +490,7 @@ private boolean canEnqueueRequest(SendCallback callback) {
                 semaphore.acquire();
             } else {
                 if (!semaphore.tryAcquire()) {
-                    callback.sendComplete(
-                            new 
PulsarClientException.ProducerQueueIsFullError("Producer send queue is full"));
+                    callback.sendComplete(new 
PulsarClientException.ProducerQueueIsFullError("Producer send queue is full"));
                     return false;
                 }
             }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
new file mode 100644
index 0000000000..59d1ad3183
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerInterceptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A container that holds the list{@link 
org.apache.pulsar.client.api.ProducerInterceptor}
+ * and wraps calls to the chain of custom interceptors.
+ */
+public class ProducerInterceptors<T> implements Closeable {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ProducerInterceptors.class);
+
+    private final List<ProducerInterceptor<T>> interceptors;
+
+    public ProducerInterceptors(List<ProducerInterceptor<T>> interceptors) {
+        this.interceptors = interceptors;
+    }
+
+    /**
+     * This is called when client sends message to pulsar broker, before key 
and value gets serialized.
+     * The method calls {@link 
ProducerInterceptor#beforeSend(Producer,Message)} method. Message returned from
+     * first interceptor's beforeSend() is passed to the second interceptor 
beforeSend(), and so on in the
+     * interceptor chain. The message returned from the last interceptor is 
returned from this method.
+     *
+     * This method does not throw exceptions. Exceptions thrown by any 
interceptor methods are caught and ignored.
+     * If a interceptor in the middle of the chain, that normally modifies the 
message, throws an exception,
+     * the next interceptor in the chain will be called with a message 
returned by the previous interceptor that did
+     * not throw an exception.
+     *
+     * @param producer the producer which contains the interceptor.
+     * @param message the message from client
+     * @return the message to send to topic/partition
+     */
+    public Message<T> beforeSend(Producer<T> producer, Message<T> message) {
+        Message<T> interceptorMessage = message;
+        for (int i = 0; i < interceptors.size(); i++) {
+            try {
+                interceptorMessage = interceptors.get(i).beforeSend(producer, 
interceptorMessage);
+            } catch (Exception e) {
+                if (message != null && producer != null) {
+                    log.warn("Error executing interceptor beforeSend callback 
for messageId: {}, topicName:{} ", message.getMessageId(), producer.getTopic(), 
e);
+                } else {
+                    log.warn("Error Error executing interceptor beforeSend 
callback ", e);
+                }
+            }
+        }
+        return interceptorMessage;
+    }
+
+    /**
+     * This method is called when the message send to the broker has been 
acknowledged, or when sending the record fails
+     * before it gets send to the broker.
+     * This method calls {@link 
ProducerInterceptor#onSendAcknowledgement(Producer, Message, MessageId, 
Throwable)} method for
+     * each interceptor.
+     *
+     * This method does not throw exceptions. Exceptions thrown by any of 
interceptor methods are caught and ignored.
+     *
+     * @param producer the producer which contains the interceptor.
+     * @param message The message returned from the last interceptor is 
returned from {@link ProducerInterceptor#beforeSend(Producer, Message)}
+     * @param msgId The message id that broker returned. Null if has error 
occurred.
+     * @param exception The exception thrown during processing of this 
message. Null if no error occurred.
+     */
+    public void onSendAcknowledgement(Producer<T> producer, Message<T> 
message, MessageId msgId, Throwable exception) {
+        for (int i = 0; i < interceptors.size(); i++) {
+            try {
+                interceptors.get(i).onSendAcknowledgement(producer, message, 
msgId, exception);
+            } catch (Exception e) {
+                log.warn("Error executing interceptor onSendAcknowledgement 
callback ", e);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        for (int i = 0; i < interceptors.size(); i++) {
+            try {
+                interceptors.get(i).close();
+            } catch (Exception e) {
+                log.error("Fail to close producer interceptor ", e);
+            }
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 4013068246..b87b9bd913 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -238,10 +238,15 @@ public ClientConfigurationData getConfiguration() {
     }
 
     public CompletableFuture<Producer<byte[]>> 
createProducerAsync(ProducerConfigurationData conf) {
-        return createProducerAsync(conf, Schema.BYTES);
+        return createProducerAsync(conf, Schema.BYTES, null);
     }
 
-    public <T> CompletableFuture<Producer<T>> 
createProducerAsync(ProducerConfigurationData conf, Schema<T> schema) {
+    public <T> CompletableFuture<Producer<T>> 
createProducerAsync(ProducerConfigurationData conf,  Schema<T> schema) {
+        return createProducerAsync(conf, schema, null);
+    }
+
+    public <T> CompletableFuture<Producer<T>> 
createProducerAsync(ProducerConfigurationData conf, Schema<T> schema,
+          ProducerInterceptors<T> interceptors) {
         if (conf == null) {
             return FutureUtil.failedFuture(
                     new 
PulsarClientException.InvalidConfigurationException("Producer configuration 
undefined"));
@@ -273,9 +278,9 @@ public ClientConfigurationData getConfiguration() {
             ProducerBase<T> producer;
             if (metadata.partitions > 1) {
                 producer = new 
PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, 
metadata.partitions,
-                        producerCreatedFuture, schema);
+                        producerCreatedFuture, schema, interceptors);
             } else {
-                producer = new ProducerImpl<>(PulsarClientImpl.this, topic, 
conf, producerCreatedFuture, -1, schema);
+                producer = new ProducerImpl<>(PulsarClientImpl.this, topic, 
conf, producerCreatedFuture, -1, schema, interceptors);
             }
 
             synchronized (producers) {
@@ -336,10 +341,10 @@ public ClientConfigurationData getConfiguration() {
     }
 
     public CompletableFuture<Consumer<byte[]>> 
subscribeAsync(ConsumerConfigurationData<byte[]> conf) {
-        return subscribeAsync(conf, Schema.BYTES);
+        return subscribeAsync(conf, Schema.BYTES, null);
     }
 
-    public <T> CompletableFuture<Consumer<T>> 
subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
+    public <T> CompletableFuture<Consumer<T>> 
subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, 
ConsumerInterceptors<T> interceptors) {
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new 
PulsarClientException.AlreadyClosedException("Client already closed"));
         }
@@ -377,15 +382,15 @@ public ClientConfigurationData getConfiguration() {
                 return FutureUtil
                     .failedFuture(new IllegalArgumentException("Topic names 
list must be null when use topicsPattern"));
             }
-            return patternTopicSubscribeAsync(conf, schema);
+            return patternTopicSubscribeAsync(conf, schema, interceptors);
         } else if (conf.getTopicNames().size() == 1) {
-            return singleTopicSubscribeAsync(conf, schema);
+            return singleTopicSubscribeAsync(conf, schema, interceptors);
         } else {
-            return multiTopicSubscribeAsync(conf, schema);
+            return multiTopicSubscribeAsync(conf, schema, interceptors);
         }
     }
 
-    private <T> CompletableFuture<Consumer<T>> 
singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
+    private <T> CompletableFuture<Consumer<T>> 
singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, 
ConsumerInterceptors<T> interceptors) {
         if (schema instanceof AutoSchema) {
             AutoSchema autoSchema = (AutoSchema) schema;
             return lookup.getSchema(TopicName.get(conf.getSingleTopic()))
@@ -395,20 +400,20 @@ public ClientConfigurationData getConfiguration() {
                             log.info("Auto detected schema for topic {} : {}",
                                 conf.getSingleTopic(), new 
String(schemaInfoOptional.get().getSchema(), UTF_8));
                             autoSchema.setSchema(genericSchema);
-                            return doSingleTopicSubscribeAsync(conf, schema);
+                            return doSingleTopicSubscribeAsync(conf, schema, 
interceptors);
                         } else {
                             return FutureUtil.failedFuture(
                                 new 
PulsarClientException.LookupException("Currently schema detection only works 
for topics with avro schemas"));
                         }
                     });
         } else {
-            return doSingleTopicSubscribeAsync(conf, schema);
+            return doSingleTopicSubscribeAsync(conf, schema, interceptors);
         }
     }
 
 
 
-    private <T> CompletableFuture<Consumer<T>> 
doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> 
schema) {
+    private <T> CompletableFuture<Consumer<T>> 
doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> 
schema, ConsumerInterceptors<T> interceptors) {
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new 
CompletableFuture<>();
 
         String topic = conf.getSingleTopic();
@@ -423,10 +428,10 @@ public ClientConfigurationData getConfiguration() {
             ExecutorService listenerThread = 
externalExecutorProvider.getExecutor();
             if (metadata.partitions > 1) {
                 consumer = 
MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
-                    listenerThread, consumerSubscribedFuture, 
metadata.partitions, schema);
+                    listenerThread, consumerSubscribedFuture, 
metadata.partitions, schema, interceptors);
             } else {
                 consumer = new ConsumerImpl<>(PulsarClientImpl.this, topic, 
conf, listenerThread, -1,
-                        consumerSubscribedFuture, schema);
+                        consumerSubscribedFuture, schema, interceptors);
             }
 
             synchronized (consumers) {
@@ -441,11 +446,11 @@ public ClientConfigurationData getConfiguration() {
         return consumerSubscribedFuture;
     }
 
-    private <T> CompletableFuture<Consumer<T>> 
multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
+    private <T> CompletableFuture<Consumer<T>> 
multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, 
ConsumerInterceptors<T> interceptors) {
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new 
CompletableFuture<>();
 
         ConsumerBase<T> consumer = new 
MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
-                externalExecutorProvider.getExecutor(), 
consumerSubscribedFuture, schema);
+                externalExecutorProvider.getExecutor(), 
consumerSubscribedFuture, schema, interceptors);
 
         synchronized (consumers) {
             consumers.put(consumer, Boolean.TRUE);
@@ -455,10 +460,10 @@ public ClientConfigurationData getConfiguration() {
     }
 
     public CompletableFuture<Consumer<byte[]>> 
patternTopicSubscribeAsync(ConsumerConfigurationData<byte[]> conf) {
-        return patternTopicSubscribeAsync(conf, Schema.BYTES);
+        return patternTopicSubscribeAsync(conf, Schema.BYTES, null);
     }
 
-    private <T> CompletableFuture<Consumer<T>> 
patternTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) 
{
+    private <T> CompletableFuture<Consumer<T>> 
patternTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, 
ConsumerInterceptors interceptors) {
         String regex = conf.getTopicsPattern().pattern();
         TopicName destination = TopicName.get(regex);
         NamespaceName namespaceName = destination.getNamespaceObject();
@@ -479,7 +484,7 @@ public ClientConfigurationData getConfiguration() {
                     conf,
                     externalExecutorProvider.getExecutor(),
                     consumerSubscribedFuture,
-                    schema);
+                    schema, interceptors);
 
                 synchronized (consumers) {
                     consumers.put(consumer, Boolean.TRUE);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index aafd12549f..388e5c487d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -85,7 +85,7 @@ public void reachedEndOfTopic(Consumer<T> consumer) {
 
         final int partitionIdx = 
TopicName.getPartitionIndex(readerConfiguration.getTopicName());
         consumer = new ConsumerImpl<>(client, 
readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
-                partitionIdx, consumerFuture, SubscriptionMode.NonDurable, 
readerConfiguration.getStartMessageId(), schema);
+                partitionIdx, consumerFuture, SubscriptionMode.NonDurable, 
readerConfiguration.getStartMessageId(), schema, null);
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index eae02b08c0..230a022c61 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -127,4 +127,8 @@ public T getValue() {
     public Optional<EncryptionContext> getEncryptionCtx() {
         return msg.getEncryptionCtx();
     }
+
+    public Message<T> getMessage() {
+        return msg;
+    }
 }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 4380c63ff7..f5108fc6c1 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -64,7 +64,7 @@ public void setup() {
         logger = mock(Logger.class);
         client = mock(PulsarClientImpl.class);
         when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, 
Schema.BYTES));
-        
when(client.createProducerAsync(Matchers.any(ProducerConfigurationData.class), 
Matchers.any(Schema.class)))
+        
when(client.createProducerAsync(Matchers.any(ProducerConfigurationData.class), 
Matchers.any(Schema.class), eq(null)))
                 .thenReturn(CompletableFuture.completedFuture(producer));
         
when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
         
when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index 89efcf594d..399a4c701b 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -21,7 +21,8 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
@@ -40,6 +41,7 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.ProducerInterceptor;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
@@ -190,6 +192,11 @@
         public ProducerBuilder<byte[]> properties(Map<String, String> 
properties) {
             return this;
         }
+
+        @Override
+        public ProducerBuilder<byte[]> 
intercept(ProducerInterceptor<byte[]>... interceptors) {
+            return null;
+        }
     }
 
     @BeforeMethod
@@ -197,7 +204,7 @@ public void setup() throws Exception {
         this.mockClient = mock(PulsarClient.class);
 
         when(mockClient.newProducer(any(Schema.class)))
-            .thenReturn(new MockProducerBuilder());
+                .thenReturn(new MockProducerBuilder());
 
         producers = new 
MultiConsumersOneOuputTopicProducers<byte[]>(mockClient, TEST_OUTPUT_TOPIC, 
Schema.BYTES, "test");
         producers.initialize();
@@ -206,12 +213,12 @@ public void setup() throws Exception {
     private Producer<byte[]> createMockProducer(String topic) {
         Producer<byte[]> producer = mock(Producer.class);
         when(producer.closeAsync())
-            .thenAnswer(invocationOnMock -> {
-                synchronized (mockProducers) {
-                    mockProducers.remove(topic);
-                }
-                return FutureUtils.Void();
-            });
+                .thenAnswer(invocationOnMock -> {
+                    synchronized (mockProducers) {
+                        mockProducers.remove(topic);
+                    }
+                    return FutureUtils.Void();
+                });
         return producer;
     }
 
@@ -224,13 +231,13 @@ public void testGetCloseProducer() throws Exception {
 
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .newProducer(Schema.BYTES);
+                .newProducer(Schema.BYTES);
         assertTrue(producers.getProducers().containsKey(producerName));
 
         // second get will not create a new producer
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .newProducer(Schema.BYTES);
+                .newProducer(Schema.BYTES);
         assertTrue(producers.getProducers().containsKey(producerName));
 
         // close


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to