This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f2160c0  Introduce ActiveConsumerListener for realizing if a consumer 
is active in a failover subscription group (#1156)
f2160c0 is described below

commit f2160c01e3581f0e8374db5d4d713810de7533af
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Thu Feb 15 12:12:00 2018 +0800

    Introduce ActiveConsumerListener for realizing if a consumer is active in a 
failover subscription group (#1156)
    
    * Introduce ConsumerGroupListener for realizing if a consumer is active in 
a failover subscription group
    
    * Rename ConsumerGroupListener to ActiveConsumerListener
    
    * Fail subscribe if active consumer listener is provided with non-failover 
subscription.
    
    * Notify consumer state change only after the cursor is rewinded.
    
    * Address comments
    
    * Rename ActiveConsumerListener to ConsumerEventListener
    Use consumer object as identifier for comparison
    
    * Fix test after rebase
    
    * Fix license headers
    
    * Ignore active consumer change command in cpp client
    
    * rename "become" to "became"
---
 .../AbstractDispatcherSingleActiveConsumer.java    |  38 +-
 .../org/apache/pulsar/broker/service/Consumer.java |  15 +
 .../PersistentDispatcherSingleActiveConsumer.java  |  10 +-
 .../PersistentDispatcherFailoverConsumerTest.java  | 157 ++++++-
 .../broker/service/PersistentFailoverE2ETest.java  |  96 +++-
 pulsar-client-cpp/lib/ClientConnection.cc          |   7 +
 pulsar-client-cpp/lib/Commands.cc                  |   3 +
 .../pulsar/client/api/ConsumerConfiguration.java   |  29 ++
 .../pulsar/client/api/ConsumerEventListener.java   |  36 ++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  14 +
 .../apache/pulsar/client/impl/ConsumerBase.java    |   3 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  14 +
 .../client/impl/PartitionedConsumerImpl.java       |   4 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |   7 +
 .../org/apache/pulsar/common/api/Commands.java     |  18 +
 .../apache/pulsar/common/api/PulsarDecoder.java    |  11 +-
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 486 +++++++++++++++++++++
 pulsar-common/src/main/proto/PulsarApi.proto       |  12 +-
 .../pulsar/common/api/PulsarDecoderTest.java       |  60 +++
 19 files changed, 993 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 5181df1..1b37f89 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -26,9 +26,9 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.apache.pulsar.broker.service.BrokerServiceException;
-import org.apache.pulsar.broker.service.Consumer;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
+import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
 import org.slf4j.Logger;
@@ -72,7 +72,17 @@ public abstract class AbstractDispatcherSingleActiveConsumer 
{
 
     protected abstract boolean isConsumersExceededOnSubscription();
 
-    protected void pickAndScheduleActiveConsumer() {
+    protected void notifyActiveConsumerChanged(Consumer activeConsumer) {
+        if (null != activeConsumer && subscriptionType == SubType.Failover) {
+            consumers.forEach(consumer ->
+                consumer.notifyActiveConsumerChange(activeConsumer));
+        }
+    }
+
+    /**
+     * @return the previous active consumer if the consumer is changed, 
otherwise null.
+     */
+    protected boolean pickAndScheduleActiveConsumer() {
         checkArgument(!consumers.isEmpty());
 
         consumers.sort((c1, c2) -> 
c1.consumerName().compareTo(c2.consumerName()));
@@ -80,12 +90,15 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer {
         int index = partitionIndex % consumers.size();
         Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, 
consumers.get(index));
 
-        if (prevConsumer == ACTIVE_CONSUMER_UPDATER.get(this)) {
+        Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+        if (prevConsumer == activeConsumer) {
             // Active consumer did not change. Do nothing at this point
-            return;
+            return false;
+        } else {
+            // If the active consumer is changed, send notification.
+            scheduleReadOnActiveConsumer();
+            return true;
         }
-
-        scheduleReadOnActiveConsumer();
     }
 
     public synchronized void addConsumer(Consumer consumer) throws 
BrokerServiceException {
@@ -109,8 +122,17 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer {
 
         consumers.add(consumer);
 
-        // Pick an active consumer and start it
-        pickAndScheduleActiveConsumer();
+        if (!pickAndScheduleActiveConsumer()) {
+            // the active consumer is not changed
+            Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+            if (null == currentActiveConsumer) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Current active consumer disappears while adding 
consumer {}", consumer);
+                }
+            } else {
+                consumer.notifyActiveConsumerChange(currentActiveConsumer);
+            }
+        }
 
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index fd77ef2..0bb30cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -152,6 +152,21 @@ public class Consumer {
         return consumerName;
     }
 
+    void notifyActiveConsumerChange(Consumer activeConsumer) {
+        if 
(!Commands.peerSupportsActiveConsumerListener(cnx.getRemoteEndpointProtocolVersion()))
 {
+            // if the client is older than `v12`, we don't need to send 
consumer group changes.
+            return;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("notify consumer {} - that [{}] for subscription {} has 
new active consumer : {}",
+                consumerId, topicName, subscription.getName(), activeConsumer);
+        }
+        cnx.ctx().writeAndFlush(
+            Commands.newActiveConsumerChange(consumerId, this == 
activeConsumer),
+            cnx.ctx().voidPromise());
+    }
+
     public boolean readCompacted() {
         return readCompacted;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 716e332..40678f8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -85,7 +85,10 @@ public final class PersistentDispatcherSingleActiveConsumer 
extends AbstractDisp
                 log.debug("[{}] Rewind cursor and read more entries without 
delay", name);
             }
             cursor.rewind();
-            readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this));
+
+            Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+            notifyActiveConsumerChanged(activeConsumer);
+            readMoreEntries(activeConsumer);
             return;
         }
 
@@ -102,7 +105,10 @@ public final class 
PersistentDispatcherSingleActiveConsumer extends AbstractDisp
                         
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
             }
             cursor.rewind();
-            readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this));
+
+            Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+            notifyActiveConsumerChanged(activeConsumer);
+            readMoreEntries(activeConsumer);
             readOnActiveConsumerTask = null;
         }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index a68165d..922b35b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -22,23 +22,29 @@ import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMo
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.matches;
+import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
 import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -57,17 +63,18 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.Consumer;
-import org.apache.pulsar.broker.service.ServerCnx;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.ZooKeeper;
 import org.mockito.invocation.InvocationOnMock;
@@ -83,9 +90,12 @@ public class PersistentDispatcherFailoverConsumerTest {
     private BrokerService brokerService;
     private ManagedLedgerFactory mlFactoryMock;
     private ServerCnx serverCnx;
+    private ServerCnx serverCnxWithOldVersion;
     private ManagedLedger ledgerMock;
     private ManagedCursor cursorMock;
     private ConfigurationCacheService configCacheService;
+    private ChannelHandlerContext channelCtx;
+    private LinkedBlockingQueue<CommandActiveConsumerChange> consumerChanges;
 
     final String successTopicName = 
"persistent://part-perf/global/perf.t1/ptopic";
     final String failTopicName = 
"persistent://part-perf/global/perf.t1/pfailTopic";
@@ -115,10 +125,50 @@ public class PersistentDispatcherFailoverConsumerTest {
         brokerService = spy(new BrokerService(pulsar));
         doReturn(brokerService).when(pulsar).getBrokerService();
 
+        consumerChanges = new LinkedBlockingQueue<>();
+        this.channelCtx = mock(ChannelHandlerContext.class);
+        doAnswer(invocationOnMock -> {
+            ByteBuf buf = invocationOnMock.getArgumentAt(0, ByteBuf.class);
+
+            ByteBuf cmdBuf = buf.retainedSlice(4, buf.writerIndex() - 4);
+            try {
+                int cmdSize = (int) cmdBuf.readUnsignedInt();
+                int writerIndex = cmdBuf.writerIndex();
+                cmdBuf.writerIndex(cmdBuf.readerIndex() + cmdSize);
+                ByteBufCodedInputStream cmdInputStream = 
ByteBufCodedInputStream.get(cmdBuf);
+
+                BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder();
+                BaseCommand cmd = cmdBuilder.mergeFrom(cmdInputStream, 
null).build();
+                cmdBuilder.recycle();
+                cmdBuf.writerIndex(writerIndex);
+                cmdInputStream.recycle();
+
+                if (cmd.hasActiveConsumerChange()) {
+                    consumerChanges.put(cmd.getActiveConsumerChange());
+                }
+                cmd.recycle();
+            } finally {
+                cmdBuf.release();
+            }
+
+            return null;
+        }).when(channelCtx).writeAndFlush(any(), any());
+
         serverCnx = spy(new ServerCnx(brokerService));
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
         doReturn(new InetSocketAddress("localhost", 
1234)).when(serverCnx).clientAddress();
+        
when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getNumber());
+        when(serverCnx.ctx()).thenReturn(channelCtx);
+
+        serverCnxWithOldVersion = spy(new ServerCnx(brokerService));
+        doReturn(true).when(serverCnxWithOldVersion).isActive();
+        doReturn(true).when(serverCnxWithOldVersion).isWritable();
+        doReturn(new InetSocketAddress("localhost", 1234))
+            .when(serverCnxWithOldVersion).clientAddress();
+        when(serverCnxWithOldVersion.getRemoteEndpointProtocolVersion())
+            .thenReturn(ProtocolVersion.v11.getNumber());
+        when(serverCnxWithOldVersion.ctx()).thenReturn(channelCtx);
 
         NamespaceService nsSvc = mock(NamespaceService.class);
         doReturn(nsSvc).when(pulsar).getNamespaceService();
@@ -193,6 +243,51 @@ public class PersistentDispatcherFailoverConsumerTest {
         }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), 
any(DeleteCursorCallback.class), anyObject());
     }
 
+    private void verifyActiveConsumerChange(CommandActiveConsumerChange change,
+                                            long consumerId,
+                                            boolean isActive) {
+        assertEquals(consumerId, change.getConsumerId());
+        assertEquals(isActive, change.getIsActive());
+        change.recycle();
+    }
+
+    @Test
+    public void testConsumerGroupChangesWithOldNewConsumers() throws Exception 
{
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock);
+
+        int partitionIndex = 0;
+        PersistentDispatcherSingleActiveConsumer pdfc = new 
PersistentDispatcherSingleActiveConsumer(cursorMock,
+                SubType.Failover, partitionIndex, topic);
+
+        // 1. Verify no consumers connected
+        assertFalse(pdfc.isConsumerConnected());
+
+        // 2. Add old consumer
+        Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0,
+                "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, 
"myrole-1", Collections.emptyMap(), false);
+        pdfc.addConsumer(consumer1);
+        List<Consumer> consumers = pdfc.getConsumers();
+        assertTrue(consumers.get(0).consumerName() == 
consumer1.consumerName());
+        assertEquals(1, consumers.size());
+        assertNull(consumerChanges.poll());
+
+        verify(channelCtx, times(0)).write(any());
+
+        // 3. Add new consumer
+        Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */, 0,
+                "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(), false);
+        pdfc.addConsumer(consumer2);
+        consumers = pdfc.getConsumers();
+        assertTrue(consumers.get(0).consumerName() == 
consumer1.consumerName());
+        assertEquals(2, consumers.size());
+
+        CommandActiveConsumerChange change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 2, false);
+
+        verify(channelCtx, times(1)).writeAndFlush(any(), any());
+    }
+
     @Test
     public void testAddRemoveConsumer() throws Exception {
         log.info("--- Starting 
PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
@@ -208,13 +303,16 @@ public class PersistentDispatcherFailoverConsumerTest {
         assertFalse(pdfc.isConsumerConnected());
 
         // 2. Add consumer
-        Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0,
+        Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0,
                 "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */);
+                false /* read compacted */));
         pdfc.addConsumer(consumer1);
         List<Consumer> consumers = pdfc.getConsumers();
         assertTrue(consumers.get(0).consumerName() == 
consumer1.consumerName());
         assertEquals(1, consumers.size());
+        CommandActiveConsumerChange change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 1, true);
+        verify(consumer1, 
times(1)).notifyActiveConsumerChange(same(consumer1));
 
         // 3. Add again, duplicate allowed
         pdfc.addConsumer(consumer1);
@@ -224,31 +322,57 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         // 4. Verify active consumer
         assertTrue(pdfc.getActiveConsumer().consumerName() == 
consumer1.consumerName());
+        // get the notified with who is the leader
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 1, true);
+        verify(consumer1, 
times(2)).notifyActiveConsumerChange(same(consumer1));
 
         // 5. Add another consumer which does not change active consumer
-        Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
-                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */);
+        Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
+                50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */));
         pdfc.addConsumer(consumer2);
         consumers = pdfc.getConsumers();
         assertTrue(pdfc.getActiveConsumer().consumerName() == 
consumer1.consumerName());
         assertEquals(3, consumers.size());
+        // get notified with who is the leader
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 2, false);
+        verify(consumer1, 
times(2)).notifyActiveConsumerChange(same(consumer1));
+        verify(consumer2, 
times(1)).notifyActiveConsumerChange(same(consumer1));
 
         // 6. Add a consumer which changes active consumer
-        Consumer consumer0 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 0 /* consumer id */, 0,
+        Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, 
topic.getName(), 0 /* consumer id */, 0,
                 "Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
-                false /* read compacted */);
+                false /* read compacted */));
         pdfc.addConsumer(consumer0);
         consumers = pdfc.getConsumers();
         assertTrue(pdfc.getActiveConsumer().consumerName() == 
consumer0.consumerName());
         assertEquals(4, consumers.size());
 
+        // all consumers will receive notifications
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 0, true);
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 1, false);
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 1, false);
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 2, false);
+        verify(consumer0, 
times(1)).notifyActiveConsumerChange(same(consumer0));
+        verify(consumer1, 
times(2)).notifyActiveConsumerChange(same(consumer1));
+        verify(consumer1, 
times(2)).notifyActiveConsumerChange(same(consumer0));
+        verify(consumer2, 
times(1)).notifyActiveConsumerChange(same(consumer1));
+        verify(consumer2, 
times(1)).notifyActiveConsumerChange(same(consumer0));
+
         // 7. Remove last consumer
         pdfc.removeConsumer(consumer2);
         consumers = pdfc.getConsumers();
         assertTrue(pdfc.getActiveConsumer().consumerName() == 
consumer0.consumerName());
         assertEquals(3, consumers.size());
+        // not consumer group changes
+        assertNull(consumerChanges.poll());
 
-        // 8. Verify if we can unsubscribe when more than one consumer is 
connected
+        // 8. Verify if we cannot unsubscribe when more than one consumer is 
connected
         assertFalse(pdfc.canUnsubscribe(consumer0));
 
         // 9. Remove active consumer
@@ -257,6 +381,12 @@ public class PersistentDispatcherFailoverConsumerTest {
         assertTrue(pdfc.getActiveConsumer().consumerName() == 
consumer1.consumerName());
         assertEquals(2, consumers.size());
 
+        // the remaining consumers will receive notifications
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 1, true);
+        change = consumerChanges.take();
+        verifyActiveConsumerChange(change, 1, true);
+
         // 10. Attempt to remove already removed consumer
         String cause = "";
         try {
@@ -271,10 +401,11 @@ public class PersistentDispatcherFailoverConsumerTest {
         consumers = pdfc.getConsumers();
         assertTrue(pdfc.getActiveConsumer().consumerName() == 
consumer1.consumerName());
         assertEquals(1, consumers.size());
+        // not consumer group changes
+        assertNull(consumerChanges.poll());
 
         // 11. With only one consumer, unsubscribe is allowed
         assertTrue(pdfc.canUnsubscribe(consumer1));
-
     }
     
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index c172b01..7bc3141 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -24,10 +24,12 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Sets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
@@ -35,6 +37,7 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -42,6 +45,7 @@ import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -68,22 +72,84 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
 
     private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 2000;
 
+    private static class TestConsumerStateEventListener implements 
ConsumerEventListener {
+
+        final LinkedBlockingQueue<Integer> activeQueue = new 
LinkedBlockingQueue<>();
+        final LinkedBlockingQueue<Integer> inActiveQueue = new 
LinkedBlockingQueue<>();
+
+        @Override
+        public void becameActive(Consumer consumer, int partitionId) {
+            try {
+                activeQueue.put(partitionId);
+            } catch (InterruptedException e) {
+            }
+        }
+
+        @Override
+        public void becameInactive(Consumer consumer, int partitionId) {
+            try {
+                inActiveQueue.put(partitionId);
+            } catch (InterruptedException e) {
+            }
+        }
+    }
+
+    private void 
verifyConsumerNotReceiveAnyStateChanges(TestConsumerStateEventListener 
listener) throws Exception {
+        assertNull(listener.activeQueue.poll());
+        assertNull(listener.inActiveQueue.poll());
+    }
+
+    private void verifyConsumerActive(TestConsumerStateEventListener listener, 
int partitionId) throws Exception {
+        assertEquals(partitionId, listener.activeQueue.take().intValue());
+        assertNull(listener.inActiveQueue.poll());
+    }
+
+    private void verifyConsumerInactive(TestConsumerStateEventListener 
listener, int partitionId) throws Exception {
+        assertEquals(partitionId, listener.inActiveQueue.take().intValue());
+        assertNull(listener.activeQueue.poll());
+    }
+
+    private static class ActiveInactiveListenerEvent implements 
ConsumerEventListener {
+
+        private final Set<Integer> activePtns = Sets.newHashSet();
+        private final Set<Integer> inactivePtns = Sets.newHashSet();
+
+        @Override
+        public synchronized void becameActive(Consumer consumer, int 
partitionId) {
+            activePtns.add(partitionId);
+            inactivePtns.remove(partitionId);
+        }
+
+        @Override
+        public synchronized void becameInactive(Consumer consumer, int 
partitionId) {
+            activePtns.remove(partitionId);
+            inactivePtns.add(partitionId);
+        }
+    }
+
     @Test
     public void testSimpleConsumerEventsWithoutPartition() throws Exception {
         final String topicName = 
"persistent://prop/use/ns-abc/failover-topic1";
         final String subName = "sub1";
         final int numMsgs = 100;
 
+        TestConsumerStateEventListener listener1 = new 
TestConsumerStateEventListener();
         ConsumerConfiguration consumerConf1 = new ConsumerConfiguration();
         consumerConf1.setSubscriptionType(SubscriptionType.Failover);
         consumerConf1.setConsumerName("1");
+        consumerConf1.setConsumerEventListener(listener1);
+
+        TestConsumerStateEventListener listener2 = new 
TestConsumerStateEventListener();
         ConsumerConfiguration consumerConf2 = new ConsumerConfiguration();
         consumerConf2.setSubscriptionType(SubscriptionType.Failover);
         consumerConf2.setConsumerName("2");
+        consumerConf2.setConsumerEventListener(listener2);
 
         // 1. two consumers on the same subscription
         Consumer consumer1 = pulsarClient.subscribe(topicName, subName, 
consumerConf1);
         Consumer consumer2 = pulsarClient.subscribe(topicName, subName, 
consumerConf2);
+        verifyConsumerActive(listener1, -1);
+        verifyConsumerInactive(listener2, -1);
 
         PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topicName);
         PersistentSubscription subRef = topicRef.getSubscription(subName);
@@ -147,6 +213,9 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
         }
         consumer1.close();
         Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
+
+        verifyConsumerActive(listener2, -1);
+        verifyConsumerNotReceiveAnyStateChanges(listener1);
         for (int i = 5; i < numMsgs; i++) {
             msg = consumer2.receive(1, TimeUnit.SECONDS);
             Assert.assertNotNull(msg);
@@ -195,9 +264,11 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
         futures.clear();
 
         // 7. consumer subscription should not send messages to the new 
consumer if its name is not highest in the list
+        TestConsumerStateEventListener listener3 = new 
TestConsumerStateEventListener();
         ConsumerConfiguration consumerConf3 = new ConsumerConfiguration();
         consumerConf3.setSubscriptionType(SubscriptionType.Failover);
         consumerConf3.setConsumerName("3");
+        consumerConf3.setConsumerEventListener(listener3);
         for (int i = 0; i < 5; i++) {
             msg = consumer1.receive(1, TimeUnit.SECONDS);
             Assert.assertNotNull(msg);
@@ -206,6 +277,9 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
         }
         Consumer consumer3 = pulsarClient.subscribe(topicName, subName, 
consumerConf3);
         Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
+
+        verifyConsumerInactive(listener3, -1);
+
         Assert.assertNull(consumer3.receive(1, TimeUnit.SECONDS));
         for (int i = 5; i < numMsgs; i++) {
             msg = consumer1.receive(1, TimeUnit.SECONDS);
@@ -247,7 +321,7 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
         admin.persistentTopics().delete(topicName);
     }
 
-    @Test(enabled = false)
+    @Test
     public void testSimpleConsumerEventsWithPartition() throws Exception {
         int numPartitions = 4;
 
@@ -261,12 +335,18 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
 
         ProducerConfiguration producerConf = new ProducerConfiguration();
         
producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+        ActiveInactiveListenerEvent listener1 = new 
ActiveInactiveListenerEvent();
         ConsumerConfiguration consumerConf1 = new ConsumerConfiguration();
         consumerConf1.setSubscriptionType(SubscriptionType.Failover);
         consumerConf1.setConsumerName("1");
+        consumerConf1.setConsumerEventListener(listener1);
+
+        ActiveInactiveListenerEvent listener2 = new 
ActiveInactiveListenerEvent();
         ConsumerConfiguration consumerConf2 = new ConsumerConfiguration();
         consumerConf2.setSubscriptionType(SubscriptionType.Failover);
         consumerConf2.setConsumerName("2");
+        consumerConf2.setConsumerEventListener(listener2);
 
         // 1. two consumers on the same subscription
         Consumer consumer1 = pulsarClient.subscribe(topicName, subName, 
consumerConf1);
@@ -298,6 +378,7 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
         // equal distribution between both consumers
         int totalMessages = 0;
         Message msg = null;
+        Set<Integer> receivedPtns = Sets.newHashSet();
         while (true) {
             msg = consumer1.receive(1, TimeUnit.SECONDS);
             if (msg == null) {
@@ -305,8 +386,16 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
             }
             totalMessages++;
             consumer1.acknowledge(msg);
+            MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+            receivedPtns.add(msgId.getPartitionIndex());
         }
+
+        assertTrue(Sets.difference(listener1.activePtns, 
receivedPtns).isEmpty());
+        assertTrue(Sets.difference(listener2.inactivePtns, 
receivedPtns).isEmpty());
+
         Assert.assertEquals(totalMessages, numMsgs / 2);
+
+        receivedPtns = Sets.newHashSet();
         while (true) {
             msg = consumer2.receive(1, TimeUnit.SECONDS);
             if (msg == null) {
@@ -314,7 +403,12 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
             }
             totalMessages++;
             consumer2.acknowledge(msg);
+            MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+            receivedPtns.add(msgId.getPartitionIndex());
         }
+        assertTrue(Sets.difference(listener1.inactivePtns, 
receivedPtns).isEmpty());
+        assertTrue(Sets.difference(listener2.activePtns, 
receivedPtns).isEmpty());
+
         Assert.assertEquals(totalMessages, numMsgs);
         Assert.assertEquals(disp0.getActiveConsumer().consumerName(), 
consumerConf1.getConsumerName());
         Assert.assertEquals(disp1.getActiveConsumer().consumerName(), 
consumerConf2.getConsumerName());
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index fc773fe..018f4ab 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -921,6 +921,13 @@ void ClientConnection::handleIncomingCommand() {
                     break;
                 }
 
+                case BaseCommand::ACTIVE_CONSUMER_CHANGE: {
+                    LOG_DEBUG(cnxString_ << "Received notification about 
active consumer changes");
+                    // ignore this message for now.
+                    // TODO: 
@link{https://github.com/apache/incubator-pulsar/issues/1240}
+                    break;
+                }
+
                 default: {
                     LOG_WARN(cnxString_ << "Received invalid message from 
server");
                     close();
diff --git a/pulsar-client-cpp/lib/Commands.cc 
b/pulsar-client-cpp/lib/Commands.cc
index 70d61df..e0245c5 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -381,6 +381,9 @@ std::string Commands::messageType(BaseCommand_Type type) {
         case BaseCommand::SEEK:
             return "SEEK";
             break;
+        case BaseCommand::ACTIVE_CONSUMER_CHANGE:
+            return "ACTIVE_CONSUMER_CHANGE";
+            break;
     };
 }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 76928bd..00e4537 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -46,6 +46,8 @@ public class ConsumerConfiguration implements Serializable {
 
     private MessageListener messageListener;
 
+    private ConsumerEventListener consumerEventListener;
+
     private int receiverQueueSize = 1000;
 
     private int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
@@ -132,6 +134,33 @@ public class ConsumerConfiguration implements Serializable 
{
     }
 
     /**
+     * @return this configured {@link ConsumerEventListener} for the consumer.
+     * @see #setConsumerEventListener(ConsumerEventListener)
+     * @since 2.0
+     */
+    public ConsumerEventListener getConsumerEventListener() {
+        return this.consumerEventListener;
+    }
+
+    /**
+     * Sets a {@link ConsumerEventListener} for the consumer.
+     *
+     * <p>The consumer group listener is used for receiving consumer state 
change in a consumer group for failover
+     * subscription. Application can then react to the consumer state changes.
+     *
+     * <p>This change is experimental. It is subject to changes coming in 
release 2.0.
+     *
+     * @param listener the consumer group listener object
+     * @return consumer configuration
+     * @since 2.0
+     */
+    public ConsumerConfiguration 
setConsumerEventListener(ConsumerEventListener listener) {
+        checkNotNull(listener);
+        this.consumerEventListener = listener;
+        return this;
+    }
+
+    /**
      * @return the configure receiver queue size value
      */
     public int getReceiverQueueSize() {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
new file mode 100644
index 0000000..aeb8bbb
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Listener on the consumer state changes.
+ */
+public interface ConsumerEventListener {
+
+    /**
+     * Notified when the consumer group is changed, and the consumer becomes 
the active consumer.
+     */
+    void becameActive(Consumer consumer, int partitionId);
+
+    /**
+     * Notified when the consumer group is changed, and the consumer is still 
inactive or becomes inactive.
+     */
+    void becameInactive(Consumer consumer, int partitionId);
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 58cdede..43c49b2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -40,6 +40,7 @@ import 
org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
 import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
@@ -253,6 +254,19 @@ public class ClientCnx extends PulsarHandler {
     }
 
     @Override
+    protected void handleActiveConsumerChange(CommandActiveConsumerChange 
change) {
+        checkArgument(state == State.Ready);
+
+        if (log.isDebugEnabled()) {
+            log.debug("{} Received a consumer group change message from the 
server : {}", ctx.channel(), change);
+        }
+        ConsumerImpl consumer = consumers.get(change.getConsumerId());
+        if (consumer != null) {
+            consumer.activeConsumerChanged(change.getIsActive());
+        }
+    }
+
+    @Override
     protected void handleSuccess(CommandSuccess success) {
         checkArgument(state == State.Ready);
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index be5e658..1886c76 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
@@ -54,6 +55,7 @@ public abstract class ConsumerBase extends HandlerBase 
implements Consumer {
     protected final String consumerName;
     protected final CompletableFuture<Consumer> subscribeFuture;
     protected final MessageListener listener;
+    protected final ConsumerEventListener consumerEventListener;
     protected final ExecutorService listenerExecutor;
     final BlockingQueue<Message> incomingMessages;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message>> 
pendingReceives;
@@ -68,6 +70,7 @@ public abstract class ConsumerBase extends HandlerBase 
implements Consumer {
         this.consumerName = conf.getConsumerName() == null ? 
ConsumerName.generateRandomName() : conf.getConsumerName();
         this.subscribeFuture = subscribeFuture;
         this.listener = conf.getMessageListener();
+        this.consumerEventListener = conf.getConsumerEventListener();
         if (receiverQueueSize <= 1) {
             this.incomingMessages = Queues.newArrayBlockingQueue(1);
         } else {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a436899..219479f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -747,6 +747,20 @@ public class ConsumerImpl extends ConsumerBase {
         }
     }
 
+    void activeConsumerChanged(boolean isActive) {
+        if (consumerEventListener == null) {
+            return;
+        }
+
+        listenerExecutor.submit(() -> {
+            if (isActive) {
+                consumerEventListener.becameActive(this, partitionIndex);
+            } else {
+                consumerEventListener.becameInactive(this, partitionIndex);
+            }
+        });
+    }
+
     void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, 
ClientCnx cnx) {
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Received message: {}/{}", topic, subscription, 
messageId.getLedgerId(),
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 60e3928..92cbea2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -439,7 +439,9 @@ public class PartitionedConsumerImpl extends ConsumerBase {
         
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
         internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
         internalConsumerConfig.setConsumerName(consumerName);
-
+        if (null != conf.getConsumerEventListener()) {
+            
internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener());
+        }
         int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
                 conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / 
numPartitions);
         internalConsumerConfig.setReceiverQueueSize(receiverQueueSize);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index f6bd759..029f712 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -251,6 +251,13 @@ public class PulsarClientImpl implements PulsarClient {
                             "Read compacted can only be used with exclusive of 
failover persistent subscriptions"));
         }
 
+        if (conf.getConsumerEventListener() != null
+            && conf.getSubscriptionType() != SubscriptionType.Failover) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.InvalidConfigurationException(
+                        "Active consumer listener is only supported for 
failover subscription"));
+        }
+
         CompletableFuture<Consumer> consumerSubscribedFuture = new 
CompletableFuture<>();
 
         getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 2912a73..a395233 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -40,6 +40,7 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
@@ -360,6 +361,19 @@ public class Commands {
         return res;
     }
 
+    public static ByteBuf newActiveConsumerChange(long consumerId, boolean 
isActive) {
+        CommandActiveConsumerChange.Builder changeBuilder = 
CommandActiveConsumerChange.newBuilder()
+            .setConsumerId(consumerId)
+            .setIsActive(isActive);
+
+        CommandActiveConsumerChange change = changeBuilder.build();
+        ByteBuf res = serializeWithSize(
+            
BaseCommand.newBuilder().setType(Type.ACTIVE_CONSUMER_CHANGE).setActiveConsumerChange(change));
+        changeBuilder.recycle();
+        change.recycle();
+        return res;
+    }
+
     public static ByteBuf newSeek(long consumerId, long requestId, long 
ledgerId, long entryId) {
         CommandSeek.Builder seekBuilder = CommandSeek.newBuilder();
         seekBuilder.setConsumerId(consumerId);
@@ -956,4 +970,8 @@ public class Commands {
     public static boolean peerSupportsGetLastMessageId(int peerVersion) {
         return peerVersion >= ProtocolVersion.v12.getNumber();
     }
+
+    public static boolean peerSupportsActiveConsumerListener(int peerVersion) {
+        return peerVersion >= ProtocolVersion.v12.getNumber();
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
index 8d2389b..2056199 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
@@ -23,6 +23,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
@@ -52,7 +53,6 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -266,6 +266,11 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                 
handleGetLastMessageIdSuccess(cmd.getGetLastMessageIdResponse());
                 cmd.getGetLastMessageIdResponse().recycle();
                 break;
+
+            case ACTIVE_CONSUMER_CHANGE:
+                handleActiveConsumerChange(cmd.getActiveConsumerChange());
+                cmd.getActiveConsumerChange().recycle();
+                break;
             }
         } finally {
             if (cmdBuilder != null) {
@@ -350,6 +355,10 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
         throw new UnsupportedOperationException();
     }
 
+    protected void handleActiveConsumerChange(CommandActiveConsumerChange 
change) {
+        throw new UnsupportedOperationException();
+    }
+
     protected void handleSuccess(CommandSuccess success) {
         throw new UnsupportedOperationException();
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 27cc63a..80df9b8 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -13624,6 +13624,393 @@ public final class PulsarApi {
     // @@protoc_insertion_point(class_scope:pulsar.proto.CommandAck)
   }
   
+  public interface CommandActiveConsumerChangeOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required uint64 consumer_id = 1;
+    boolean hasConsumerId();
+    long getConsumerId();
+    
+    // optional bool is_active = 2 [default = false];
+    boolean hasIsActive();
+    boolean getIsActive();
+  }
+  public static final class CommandActiveConsumerChange extends
+      com.google.protobuf.GeneratedMessageLite
+      implements CommandActiveConsumerChangeOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage
  {
+    // Use CommandActiveConsumerChange.newBuilder() to construct.
+    private final io.netty.util.Recycler.Handle<CommandActiveConsumerChange> 
handle;
+    private 
CommandActiveConsumerChange(io.netty.util.Recycler.Handle<CommandActiveConsumerChange>
 handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<CommandActiveConsumerChange> 
RECYCLER = new io.netty.util.Recycler<CommandActiveConsumerChange>() {
+            protected CommandActiveConsumerChange 
newObject(Handle<CommandActiveConsumerChange> handle) {
+              return new CommandActiveConsumerChange(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            handle.recycle(this);
+        }
+         
+    private CommandActiveConsumerChange(boolean noInit) {
+        this.handle = null;
+    }
+    
+    private static final CommandActiveConsumerChange defaultInstance;
+    public static CommandActiveConsumerChange getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandActiveConsumerChange getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required uint64 consumer_id = 1;
+    public static final int CONSUMER_ID_FIELD_NUMBER = 1;
+    private long consumerId_;
+    public boolean hasConsumerId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getConsumerId() {
+      return consumerId_;
+    }
+    
+    // optional bool is_active = 2 [default = false];
+    public static final int IS_ACTIVE_FIELD_NUMBER = 2;
+    private boolean isActive_;
+    public boolean hasIsActive() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public boolean getIsActive() {
+      return isActive_;
+    }
+    
+    private void initFields() {
+      consumerId_ = 0L;
+      isActive_ = false;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasConsumerId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeUInt64(1, consumerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBool(2, isActive_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(1, consumerId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(2, isActive_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange, 
Builder>
+        implements 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChangeOrBuilder,
 
org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder
  {
+      // Construct using 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.newBuilder()
+      private final io.netty.util.Recycler.Handle<Builder> handle;
+      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new 
io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> 
handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                handle.recycle(this);
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        consumerId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        isActive_ = false;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance();
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
build() {
+        
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange result 
= buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange result 
= buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
buildPartial() {
+        
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange result 
= 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.consumerId_ = consumerId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.isActive_ = isActive_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange
 other) {
+        if (other == 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance())
 return this;
+        if (other.hasConsumerId()) {
+          setConsumerId(other.getConsumerId());
+        }
+        if (other.hasIsActive()) {
+          setIsActive(other.getIsActive());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasConsumerId()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
+                              com.google.protobuf.ExtensionRegistryLite 
extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is 
disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              consumerId_ = input.readUInt64();
+              break;
+            }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              isActive_ = input.readBool();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required uint64 consumer_id = 1;
+      private long consumerId_ ;
+      public boolean hasConsumerId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getConsumerId() {
+        return consumerId_;
+      }
+      public Builder setConsumerId(long value) {
+        bitField0_ |= 0x00000001;
+        consumerId_ = value;
+        
+        return this;
+      }
+      public Builder clearConsumerId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        consumerId_ = 0L;
+        
+        return this;
+      }
+      
+      // optional bool is_active = 2 [default = false];
+      private boolean isActive_ ;
+      public boolean hasIsActive() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public boolean getIsActive() {
+        return isActive_;
+      }
+      public Builder setIsActive(boolean value) {
+        bitField0_ |= 0x00000002;
+        isActive_ = value;
+        
+        return this;
+      }
+      public Builder clearIsActive() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        isActive_ = false;
+        
+        return this;
+      }
+      
+      // 
@@protoc_insertion_point(builder_scope:pulsar.proto.CommandActiveConsumerChange)
+    }
+    
+    static {
+      defaultInstance = new CommandActiveConsumerChange(true);
+      defaultInstance.initFields();
+    }
+    
+    // 
@@protoc_insertion_point(class_scope:pulsar.proto.CommandActiveConsumerChange)
+  }
+  
   public interface CommandFlowOrBuilder
       extends com.google.protobuf.MessageLiteOrBuilder {
     
@@ -21017,6 +21404,10 @@ public final class PulsarApi {
     // optional .pulsar.proto.CommandGetLastMessageIdResponse 
getLastMessageIdResponse = 30;
     boolean hasGetLastMessageIdResponse();
     
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse 
getGetLastMessageIdResponse();
+    
+    // optional .pulsar.proto.CommandActiveConsumerChange 
active_consumer_change = 31;
+    boolean hasActiveConsumerChange();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
getActiveConsumerChange();
   }
   public static final class BaseCommand extends
       com.google.protobuf.GeneratedMessageLite
@@ -21085,6 +21476,7 @@ public final class PulsarApi {
       SEEK(26, 28),
       GET_LAST_MESSAGE_ID(27, 29),
       GET_LAST_MESSAGE_ID_RESPONSE(28, 30),
+      ACTIVE_CONSUMER_CHANGE(29, 31),
       ;
       
       public static final int CONNECT_VALUE = 2;
@@ -21116,6 +21508,7 @@ public final class PulsarApi {
       public static final int SEEK_VALUE = 28;
       public static final int GET_LAST_MESSAGE_ID_VALUE = 29;
       public static final int GET_LAST_MESSAGE_ID_RESPONSE_VALUE = 30;
+      public static final int ACTIVE_CONSUMER_CHANGE_VALUE = 31;
       
       
       public final int getNumber() { return value; }
@@ -21151,6 +21544,7 @@ public final class PulsarApi {
           case 28: return SEEK;
           case 29: return GET_LAST_MESSAGE_ID;
           case 30: return GET_LAST_MESSAGE_ID_RESPONSE;
+          case 31: return ACTIVE_CONSUMER_CHANGE;
           default: return null;
         }
       }
@@ -21477,6 +21871,16 @@ public final class PulsarApi {
       return getLastMessageIdResponse_;
     }
     
+    // optional .pulsar.proto.CommandActiveConsumerChange 
active_consumer_change = 31;
+    public static final int ACTIVE_CONSUMER_CHANGE_FIELD_NUMBER = 31;
+    private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
activeConsumerChange_;
+    public boolean hasActiveConsumerChange() {
+      return ((bitField0_ & 0x40000000) == 0x40000000);
+    }
+    public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
getActiveConsumerChange() {
+      return activeConsumerChange_;
+    }
+    
     private void initFields() {
       type_ = 
org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type.CONNECT;
       connect_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect.getDefaultInstance();
@@ -21508,6 +21912,7 @@ public final class PulsarApi {
       seek_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek.getDefaultInstance();
       getLastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance();
       getLastMessageIdResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance();
+      activeConsumerChange_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -21680,6 +22085,12 @@ public final class PulsarApi {
           return false;
         }
       }
+      if (hasActiveConsumerChange()) {
+        if (!getActiveConsumerChange().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -21782,6 +22193,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x20000000) == 0x20000000)) {
         output.writeMessage(30, getLastMessageIdResponse_);
       }
+      if (((bitField0_ & 0x40000000) == 0x40000000)) {
+        output.writeMessage(31, activeConsumerChange_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -21910,6 +22324,10 @@ public final class PulsarApi {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(30, getLastMessageIdResponse_);
       }
+      if (((bitField0_ & 0x40000000) == 0x40000000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(31, activeConsumerChange_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -22083,6 +22501,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x10000000);
         getLastMessageIdResponse_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance();
         bitField0_ = (bitField0_ & ~0x20000000);
+        activeConsumerChange_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x40000000);
         return this;
       }
       
@@ -22236,6 +22656,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x20000000;
         }
         result.getLastMessageIdResponse_ = getLastMessageIdResponse_;
+        if (((from_bitField0_ & 0x40000000) == 0x40000000)) {
+          to_bitField0_ |= 0x40000000;
+        }
+        result.activeConsumerChange_ = activeConsumerChange_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -22332,6 +22756,9 @@ public final class PulsarApi {
         if (other.hasGetLastMessageIdResponse()) {
           mergeGetLastMessageIdResponse(other.getGetLastMessageIdResponse());
         }
+        if (other.hasActiveConsumerChange()) {
+          mergeActiveConsumerChange(other.getActiveConsumerChange());
+        }
         return this;
       }
       
@@ -22502,6 +22929,12 @@ public final class PulsarApi {
             return false;
           }
         }
+        if (hasActiveConsumerChange()) {
+          if (!getActiveConsumerChange().isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -22826,6 +23259,16 @@ public final class PulsarApi {
               subBuilder.recycle();
               break;
             }
+            case 250: {
+              
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.Builder
 subBuilder = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.newBuilder();
+              if (hasActiveConsumerChange()) {
+                subBuilder.mergeFrom(getActiveConsumerChange());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setActiveConsumerChange(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
           }
         }
       }
@@ -24103,6 +24546,49 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional .pulsar.proto.CommandActiveConsumerChange 
active_consumer_change = 31;
+      private 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
activeConsumerChange_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance();
+      public boolean hasActiveConsumerChange() {
+        return ((bitField0_ & 0x40000000) == 0x40000000);
+      }
+      public 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange 
getActiveConsumerChange() {
+        return activeConsumerChange_;
+      }
+      public Builder 
setActiveConsumerChange(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange
 value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        activeConsumerChange_ = value;
+        
+        bitField0_ |= 0x40000000;
+        return this;
+      }
+      public Builder setActiveConsumerChange(
+          
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.Builder
 builderForValue) {
+        activeConsumerChange_ = builderForValue.build();
+        
+        bitField0_ |= 0x40000000;
+        return this;
+      }
+      public Builder 
mergeActiveConsumerChange(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange
 value) {
+        if (((bitField0_ & 0x40000000) == 0x40000000) &&
+            activeConsumerChange_ != 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance())
 {
+          activeConsumerChange_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.newBuilder(activeConsumerChange_).mergeFrom(value).buildPartial();
+        } else {
+          activeConsumerChange_ = value;
+        }
+        
+        bitField0_ |= 0x40000000;
+        return this;
+      }
+      public Builder clearActiveConsumerChange() {
+        activeConsumerChange_ = 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x40000000);
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.BaseCommand)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index ca3d2fb..4a1112d 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -135,7 +135,8 @@ enum ProtocolVersion {
        v9 = 9;  // Added end of topic notification
        v10 = 10;// Added proxy to broker
        v11 = 11;// C++ consumers before this version are not correctly 
handling the checksum field
-       v12 = 12;//Added get topic's last messageId from broker
+       v12 = 12;// Added get topic's last messageId from broker
+                 // Added CommandActiveConsumerChange
 }
 
 message CommandConnect {
@@ -324,6 +325,12 @@ message CommandAck {
         repeated KeyLongValue properties = 5;
 }
 
+// changes on active consumer
+message CommandActiveConsumerChange {
+        required uint64 consumer_id    = 1;
+        optional bool is_active     = 2 [default = false];
+}
+
 message CommandFlow {
        required uint64 consumer_id       = 1;
 
@@ -500,6 +507,8 @@ message BaseCommand {
 
                GET_LAST_MESSAGE_ID = 29;
                GET_LAST_MESSAGE_ID_RESPONSE = 30;
+
+                ACTIVE_CONSUMER_CHANGE = 31;
        }
 
        required Type type = 1;
@@ -544,5 +553,6 @@ message BaseCommand {
        optional CommandGetLastMessageId getLastMessageId = 29;
        optional CommandGetLastMessageIdResponse getLastMessageIdResponse = 30;
 
+        optional CommandActiveConsumerChange active_consumer_change = 31;
 
 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/api/PulsarDecoderTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/api/PulsarDecoderTest.java
new file mode 100644
index 0000000..c57a8c8
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/api/PulsarDecoderTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.api;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test of {@link PulsarDecoder}.
+ */
+public class PulsarDecoderTest {
+
+    private PulsarDecoder decoder;
+
+    @BeforeMethod
+    public void setup() {
+        this.decoder = mock(PulsarDecoder.class, CALLS_REAL_METHODS);
+    }
+
+    @Test
+    public void testChannelRead() throws Exception {
+        long consumerId = 1234L;
+        ByteBuf changeBuf = Commands.newActiveConsumerChange(consumerId, true);
+        ByteBuf cmdBuf = changeBuf.slice(4, changeBuf.writerIndex() - 4);
+
+        
doNothing().when(decoder).handleActiveConsumerChange(any(CommandActiveConsumerChange.class));
+        decoder.channelRead(mock(ChannelHandlerContext.class), cmdBuf);
+
+        verify(decoder, times(1))
+            
.handleActiveConsumerChange(any(CommandActiveConsumerChange.class));
+    }
+
+
+}

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to