This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f2160c0 Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group (#1156) f2160c0 is described below commit f2160c01e3581f0e8374db5d4d713810de7533af Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Thu Feb 15 12:12:00 2018 +0800 Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group (#1156) * Introduce ConsumerGroupListener for realizing if a consumer is active in a failover subscription group * Rename ConsumerGroupListener to ActiveConsumerListener * Fail subscribe if active consumer listener is provided with non-failover subscription. * Notify consumer state change only after the cursor is rewinded. * Address comments * Rename ActiveConsumerListener to ConsumerEventListener Use consumer object as identifier for comparison * Fix test after rebase * Fix license headers * Ignore active consumer change command in cpp client * rename "become" to "became" --- .../AbstractDispatcherSingleActiveConsumer.java | 38 +- .../org/apache/pulsar/broker/service/Consumer.java | 15 + .../PersistentDispatcherSingleActiveConsumer.java | 10 +- .../PersistentDispatcherFailoverConsumerTest.java | 157 ++++++- .../broker/service/PersistentFailoverE2ETest.java | 96 +++- pulsar-client-cpp/lib/ClientConnection.cc | 7 + pulsar-client-cpp/lib/Commands.cc | 3 + .../pulsar/client/api/ConsumerConfiguration.java | 29 ++ .../pulsar/client/api/ConsumerEventListener.java | 36 ++ .../org/apache/pulsar/client/impl/ClientCnx.java | 14 + .../apache/pulsar/client/impl/ConsumerBase.java | 3 + .../apache/pulsar/client/impl/ConsumerImpl.java | 14 + .../client/impl/PartitionedConsumerImpl.java | 4 +- .../pulsar/client/impl/PulsarClientImpl.java | 7 + .../org/apache/pulsar/common/api/Commands.java | 18 + .../apache/pulsar/common/api/PulsarDecoder.java | 11 +- .../apache/pulsar/common/api/proto/PulsarApi.java | 486 +++++++++++++++++++++ pulsar-common/src/main/proto/PulsarApi.proto | 12 +- .../pulsar/common/api/PulsarDecoderTest.java | 60 +++ 19 files changed, 993 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index 5181df1..1b37f89 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -26,9 +26,9 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.pulsar.broker.service.BrokerServiceException; -import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; +import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.utils.CopyOnWriteArrayList; import org.slf4j.Logger; @@ -72,7 +72,17 @@ public abstract class AbstractDispatcherSingleActiveConsumer { protected abstract boolean isConsumersExceededOnSubscription(); - protected void pickAndScheduleActiveConsumer() { + protected void notifyActiveConsumerChanged(Consumer activeConsumer) { + if (null != activeConsumer && subscriptionType == SubType.Failover) { + consumers.forEach(consumer -> + consumer.notifyActiveConsumerChange(activeConsumer)); + } + } + + /** + * @return the previous active consumer if the consumer is changed, otherwise null. + */ + protected boolean pickAndScheduleActiveConsumer() { checkArgument(!consumers.isEmpty()); consumers.sort((c1, c2) -> c1.consumerName().compareTo(c2.consumerName())); @@ -80,12 +90,15 @@ public abstract class AbstractDispatcherSingleActiveConsumer { int index = partitionIndex % consumers.size(); Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index)); - if (prevConsumer == ACTIVE_CONSUMER_UPDATER.get(this)) { + Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); + if (prevConsumer == activeConsumer) { // Active consumer did not change. Do nothing at this point - return; + return false; + } else { + // If the active consumer is changed, send notification. + scheduleReadOnActiveConsumer(); + return true; } - - scheduleReadOnActiveConsumer(); } public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { @@ -109,8 +122,17 @@ public abstract class AbstractDispatcherSingleActiveConsumer { consumers.add(consumer); - // Pick an active consumer and start it - pickAndScheduleActiveConsumer(); + if (!pickAndScheduleActiveConsumer()) { + // the active consumer is not changed + Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this); + if (null == currentActiveConsumer) { + if (log.isDebugEnabled()) { + log.debug("Current active consumer disappears while adding consumer {}", consumer); + } + } else { + consumer.notifyActiveConsumerChange(currentActiveConsumer); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index fd77ef2..0bb30cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -152,6 +152,21 @@ public class Consumer { return consumerName; } + void notifyActiveConsumerChange(Consumer activeConsumer) { + if (!Commands.peerSupportsActiveConsumerListener(cnx.getRemoteEndpointProtocolVersion())) { + // if the client is older than `v12`, we don't need to send consumer group changes. + return; + } + + if (log.isDebugEnabled()) { + log.debug("notify consumer {} - that [{}] for subscription {} has new active consumer : {}", + consumerId, topicName, subscription.getName(), activeConsumer); + } + cnx.ctx().writeAndFlush( + Commands.newActiveConsumerChange(consumerId, this == activeConsumer), + cnx.ctx().voidPromise()); + } + public boolean readCompacted() { return readCompacted; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 716e332..40678f8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -85,7 +85,10 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp log.debug("[{}] Rewind cursor and read more entries without delay", name); } cursor.rewind(); - readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this)); + + Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); + notifyActiveConsumerChanged(activeConsumer); + readMoreEntries(activeConsumer); return; } @@ -102,7 +105,10 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp serviceConfig.getActiveConsumerFailoverDelayTimeMillis()); } cursor.rewind(); - readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this)); + + Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); + notifyActiveConsumerChanged(activeConsumer); + readMoreEntries(activeConsumer); readOnActiveConsumerTask = null; }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index a68165d..922b35b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -22,23 +22,29 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMo import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.matches; +import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; @@ -57,17 +63,18 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.Consumer; -import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; +import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.apache.zookeeper.ZooKeeper; import org.mockito.invocation.InvocationOnMock; @@ -83,9 +90,12 @@ public class PersistentDispatcherFailoverConsumerTest { private BrokerService brokerService; private ManagedLedgerFactory mlFactoryMock; private ServerCnx serverCnx; + private ServerCnx serverCnxWithOldVersion; private ManagedLedger ledgerMock; private ManagedCursor cursorMock; private ConfigurationCacheService configCacheService; + private ChannelHandlerContext channelCtx; + private LinkedBlockingQueue<CommandActiveConsumerChange> consumerChanges; final String successTopicName = "persistent://part-perf/global/perf.t1/ptopic"; final String failTopicName = "persistent://part-perf/global/perf.t1/pfailTopic"; @@ -115,10 +125,50 @@ public class PersistentDispatcherFailoverConsumerTest { brokerService = spy(new BrokerService(pulsar)); doReturn(brokerService).when(pulsar).getBrokerService(); + consumerChanges = new LinkedBlockingQueue<>(); + this.channelCtx = mock(ChannelHandlerContext.class); + doAnswer(invocationOnMock -> { + ByteBuf buf = invocationOnMock.getArgumentAt(0, ByteBuf.class); + + ByteBuf cmdBuf = buf.retainedSlice(4, buf.writerIndex() - 4); + try { + int cmdSize = (int) cmdBuf.readUnsignedInt(); + int writerIndex = cmdBuf.writerIndex(); + cmdBuf.writerIndex(cmdBuf.readerIndex() + cmdSize); + ByteBufCodedInputStream cmdInputStream = ByteBufCodedInputStream.get(cmdBuf); + + BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder(); + BaseCommand cmd = cmdBuilder.mergeFrom(cmdInputStream, null).build(); + cmdBuilder.recycle(); + cmdBuf.writerIndex(writerIndex); + cmdInputStream.recycle(); + + if (cmd.hasActiveConsumerChange()) { + consumerChanges.put(cmd.getActiveConsumerChange()); + } + cmd.recycle(); + } finally { + cmdBuf.release(); + } + + return null; + }).when(channelCtx).writeAndFlush(any(), any()); + serverCnx = spy(new ServerCnx(brokerService)); doReturn(true).when(serverCnx).isActive(); doReturn(true).when(serverCnx).isWritable(); doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress(); + when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getNumber()); + when(serverCnx.ctx()).thenReturn(channelCtx); + + serverCnxWithOldVersion = spy(new ServerCnx(brokerService)); + doReturn(true).when(serverCnxWithOldVersion).isActive(); + doReturn(true).when(serverCnxWithOldVersion).isWritable(); + doReturn(new InetSocketAddress("localhost", 1234)) + .when(serverCnxWithOldVersion).clientAddress(); + when(serverCnxWithOldVersion.getRemoteEndpointProtocolVersion()) + .thenReturn(ProtocolVersion.v11.getNumber()); + when(serverCnxWithOldVersion.ctx()).thenReturn(channelCtx); NamespaceService nsSvc = mock(NamespaceService.class); doReturn(nsSvc).when(pulsar).getNamespaceService(); @@ -193,6 +243,51 @@ public class PersistentDispatcherFailoverConsumerTest { }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), anyObject()); } + private void verifyActiveConsumerChange(CommandActiveConsumerChange change, + long consumerId, + boolean isActive) { + assertEquals(consumerId, change.getConsumerId()); + assertEquals(isActive, change.getIsActive()); + change.recycle(); + } + + @Test + public void testConsumerGroupChangesWithOldNewConsumers() throws Exception { + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock); + + int partitionIndex = 0; + PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock, + SubType.Failover, partitionIndex, topic); + + // 1. Verify no consumers connected + assertFalse(pdfc.isConsumerConnected()); + + // 2. Add old consumer + Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, + "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false); + pdfc.addConsumer(consumer1); + List<Consumer> consumers = pdfc.getConsumers(); + assertTrue(consumers.get(0).consumerName() == consumer1.consumerName()); + assertEquals(1, consumers.size()); + assertNull(consumerChanges.poll()); + + verify(channelCtx, times(0)).write(any()); + + // 3. Add new consumer + Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, + "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false); + pdfc.addConsumer(consumer2); + consumers = pdfc.getConsumers(); + assertTrue(consumers.get(0).consumerName() == consumer1.consumerName()); + assertEquals(2, consumers.size()); + + CommandActiveConsumerChange change = consumerChanges.take(); + verifyActiveConsumerChange(change, 2, false); + + verify(channelCtx, times(1)).writeAndFlush(any(), any()); + } + @Test public void testAddRemoveConsumer() throws Exception { log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---"); @@ -208,13 +303,16 @@ public class PersistentDispatcherFailoverConsumerTest { assertFalse(pdfc.isConsumerConnected()); // 2. Add consumer - Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, + Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */); + false /* read compacted */)); pdfc.addConsumer(consumer1); List<Consumer> consumers = pdfc.getConsumers(); assertTrue(consumers.get(0).consumerName() == consumer1.consumerName()); assertEquals(1, consumers.size()); + CommandActiveConsumerChange change = consumerChanges.take(); + verifyActiveConsumerChange(change, 1, true); + verify(consumer1, times(1)).notifyActiveConsumerChange(same(consumer1)); // 3. Add again, duplicate allowed pdfc.addConsumer(consumer1); @@ -224,31 +322,57 @@ public class PersistentDispatcherFailoverConsumerTest { // 4. Verify active consumer assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName()); + // get the notified with who is the leader + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 1, true); + verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1)); // 5. Add another consumer which does not change active consumer - Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */); + Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */)); pdfc.addConsumer(consumer2); consumers = pdfc.getConsumers(); assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName()); assertEquals(3, consumers.size()); + // get notified with who is the leader + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 2, false); + verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1)); + verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1)); // 6. Add a consumer which changes active consumer - Consumer consumer0 = new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0, + Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0, "Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */); + false /* read compacted */)); pdfc.addConsumer(consumer0); consumers = pdfc.getConsumers(); assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName()); assertEquals(4, consumers.size()); + // all consumers will receive notifications + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 0, true); + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 1, false); + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 1, false); + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 2, false); + verify(consumer0, times(1)).notifyActiveConsumerChange(same(consumer0)); + verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1)); + verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer0)); + verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1)); + verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer0)); + // 7. Remove last consumer pdfc.removeConsumer(consumer2); consumers = pdfc.getConsumers(); assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName()); assertEquals(3, consumers.size()); + // not consumer group changes + assertNull(consumerChanges.poll()); - // 8. Verify if we can unsubscribe when more than one consumer is connected + // 8. Verify if we cannot unsubscribe when more than one consumer is connected assertFalse(pdfc.canUnsubscribe(consumer0)); // 9. Remove active consumer @@ -257,6 +381,12 @@ public class PersistentDispatcherFailoverConsumerTest { assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName()); assertEquals(2, consumers.size()); + // the remaining consumers will receive notifications + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 1, true); + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 1, true); + // 10. Attempt to remove already removed consumer String cause = ""; try { @@ -271,10 +401,11 @@ public class PersistentDispatcherFailoverConsumerTest { consumers = pdfc.getConsumers(); assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName()); assertEquals(1, consumers.size()); + // not consumer group changes + assertNull(consumerChanges.poll()); // 11. With only one consumer, unsubscribe is allowed assertTrue(pdfc.canUnsubscribe(consumer1)); - } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index c172b01..7bc3141 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -24,10 +24,12 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.Sets; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; @@ -35,6 +37,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -42,6 +45,7 @@ import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.util.FutureUtil; @@ -68,22 +72,84 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 2000; + private static class TestConsumerStateEventListener implements ConsumerEventListener { + + final LinkedBlockingQueue<Integer> activeQueue = new LinkedBlockingQueue<>(); + final LinkedBlockingQueue<Integer> inActiveQueue = new LinkedBlockingQueue<>(); + + @Override + public void becameActive(Consumer consumer, int partitionId) { + try { + activeQueue.put(partitionId); + } catch (InterruptedException e) { + } + } + + @Override + public void becameInactive(Consumer consumer, int partitionId) { + try { + inActiveQueue.put(partitionId); + } catch (InterruptedException e) { + } + } + } + + private void verifyConsumerNotReceiveAnyStateChanges(TestConsumerStateEventListener listener) throws Exception { + assertNull(listener.activeQueue.poll()); + assertNull(listener.inActiveQueue.poll()); + } + + private void verifyConsumerActive(TestConsumerStateEventListener listener, int partitionId) throws Exception { + assertEquals(partitionId, listener.activeQueue.take().intValue()); + assertNull(listener.inActiveQueue.poll()); + } + + private void verifyConsumerInactive(TestConsumerStateEventListener listener, int partitionId) throws Exception { + assertEquals(partitionId, listener.inActiveQueue.take().intValue()); + assertNull(listener.activeQueue.poll()); + } + + private static class ActiveInactiveListenerEvent implements ConsumerEventListener { + + private final Set<Integer> activePtns = Sets.newHashSet(); + private final Set<Integer> inactivePtns = Sets.newHashSet(); + + @Override + public synchronized void becameActive(Consumer consumer, int partitionId) { + activePtns.add(partitionId); + inactivePtns.remove(partitionId); + } + + @Override + public synchronized void becameInactive(Consumer consumer, int partitionId) { + activePtns.remove(partitionId); + inactivePtns.add(partitionId); + } + } + @Test public void testSimpleConsumerEventsWithoutPartition() throws Exception { final String topicName = "persistent://prop/use/ns-abc/failover-topic1"; final String subName = "sub1"; final int numMsgs = 100; + TestConsumerStateEventListener listener1 = new TestConsumerStateEventListener(); ConsumerConfiguration consumerConf1 = new ConsumerConfiguration(); consumerConf1.setSubscriptionType(SubscriptionType.Failover); consumerConf1.setConsumerName("1"); + consumerConf1.setConsumerEventListener(listener1); + + TestConsumerStateEventListener listener2 = new TestConsumerStateEventListener(); ConsumerConfiguration consumerConf2 = new ConsumerConfiguration(); consumerConf2.setSubscriptionType(SubscriptionType.Failover); consumerConf2.setConsumerName("2"); + consumerConf2.setConsumerEventListener(listener2); // 1. two consumers on the same subscription Consumer consumer1 = pulsarClient.subscribe(topicName, subName, consumerConf1); Consumer consumer2 = pulsarClient.subscribe(topicName, subName, consumerConf2); + verifyConsumerActive(listener1, -1); + verifyConsumerInactive(listener2, -1); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); PersistentSubscription subRef = topicRef.getSubscription(subName); @@ -147,6 +213,9 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { } consumer1.close(); Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME); + + verifyConsumerActive(listener2, -1); + verifyConsumerNotReceiveAnyStateChanges(listener1); for (int i = 5; i < numMsgs; i++) { msg = consumer2.receive(1, TimeUnit.SECONDS); Assert.assertNotNull(msg); @@ -195,9 +264,11 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { futures.clear(); // 7. consumer subscription should not send messages to the new consumer if its name is not highest in the list + TestConsumerStateEventListener listener3 = new TestConsumerStateEventListener(); ConsumerConfiguration consumerConf3 = new ConsumerConfiguration(); consumerConf3.setSubscriptionType(SubscriptionType.Failover); consumerConf3.setConsumerName("3"); + consumerConf3.setConsumerEventListener(listener3); for (int i = 0; i < 5; i++) { msg = consumer1.receive(1, TimeUnit.SECONDS); Assert.assertNotNull(msg); @@ -206,6 +277,9 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { } Consumer consumer3 = pulsarClient.subscribe(topicName, subName, consumerConf3); Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME); + + verifyConsumerInactive(listener3, -1); + Assert.assertNull(consumer3.receive(1, TimeUnit.SECONDS)); for (int i = 5; i < numMsgs; i++) { msg = consumer1.receive(1, TimeUnit.SECONDS); @@ -247,7 +321,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { admin.persistentTopics().delete(topicName); } - @Test(enabled = false) + @Test public void testSimpleConsumerEventsWithPartition() throws Exception { int numPartitions = 4; @@ -261,12 +335,18 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { ProducerConfiguration producerConf = new ProducerConfiguration(); producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); + + ActiveInactiveListenerEvent listener1 = new ActiveInactiveListenerEvent(); ConsumerConfiguration consumerConf1 = new ConsumerConfiguration(); consumerConf1.setSubscriptionType(SubscriptionType.Failover); consumerConf1.setConsumerName("1"); + consumerConf1.setConsumerEventListener(listener1); + + ActiveInactiveListenerEvent listener2 = new ActiveInactiveListenerEvent(); ConsumerConfiguration consumerConf2 = new ConsumerConfiguration(); consumerConf2.setSubscriptionType(SubscriptionType.Failover); consumerConf2.setConsumerName("2"); + consumerConf2.setConsumerEventListener(listener2); // 1. two consumers on the same subscription Consumer consumer1 = pulsarClient.subscribe(topicName, subName, consumerConf1); @@ -298,6 +378,7 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { // equal distribution between both consumers int totalMessages = 0; Message msg = null; + Set<Integer> receivedPtns = Sets.newHashSet(); while (true) { msg = consumer1.receive(1, TimeUnit.SECONDS); if (msg == null) { @@ -305,8 +386,16 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { } totalMessages++; consumer1.acknowledge(msg); + MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); + receivedPtns.add(msgId.getPartitionIndex()); } + + assertTrue(Sets.difference(listener1.activePtns, receivedPtns).isEmpty()); + assertTrue(Sets.difference(listener2.inactivePtns, receivedPtns).isEmpty()); + Assert.assertEquals(totalMessages, numMsgs / 2); + + receivedPtns = Sets.newHashSet(); while (true) { msg = consumer2.receive(1, TimeUnit.SECONDS); if (msg == null) { @@ -314,7 +403,12 @@ public class PersistentFailoverE2ETest extends BrokerTestBase { } totalMessages++; consumer2.acknowledge(msg); + MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); + receivedPtns.add(msgId.getPartitionIndex()); } + assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty()); + assertTrue(Sets.difference(listener2.activePtns, receivedPtns).isEmpty()); + Assert.assertEquals(totalMessages, numMsgs); Assert.assertEquals(disp0.getActiveConsumer().consumerName(), consumerConf1.getConsumerName()); Assert.assertEquals(disp1.getActiveConsumer().consumerName(), consumerConf2.getConsumerName()); diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index fc773fe..018f4ab 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -921,6 +921,13 @@ void ClientConnection::handleIncomingCommand() { break; } + case BaseCommand::ACTIVE_CONSUMER_CHANGE: { + LOG_DEBUG(cnxString_ << "Received notification about active consumer changes"); + // ignore this message for now. + // TODO: @link{https://github.com/apache/incubator-pulsar/issues/1240} + break; + } + default: { LOG_WARN(cnxString_ << "Received invalid message from server"); close(); diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 70d61df..e0245c5 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -381,6 +381,9 @@ std::string Commands::messageType(BaseCommand_Type type) { case BaseCommand::SEEK: return "SEEK"; break; + case BaseCommand::ACTIVE_CONSUMER_CHANGE: + return "ACTIVE_CONSUMER_CHANGE"; + break; }; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java index 76928bd..00e4537 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java @@ -46,6 +46,8 @@ public class ConsumerConfiguration implements Serializable { private MessageListener messageListener; + private ConsumerEventListener consumerEventListener; + private int receiverQueueSize = 1000; private int maxTotalReceiverQueueSizeAcrossPartitions = 50000; @@ -132,6 +134,33 @@ public class ConsumerConfiguration implements Serializable { } /** + * @return this configured {@link ConsumerEventListener} for the consumer. + * @see #setConsumerEventListener(ConsumerEventListener) + * @since 2.0 + */ + public ConsumerEventListener getConsumerEventListener() { + return this.consumerEventListener; + } + + /** + * Sets a {@link ConsumerEventListener} for the consumer. + * + * <p>The consumer group listener is used for receiving consumer state change in a consumer group for failover + * subscription. Application can then react to the consumer state changes. + * + * <p>This change is experimental. It is subject to changes coming in release 2.0. + * + * @param listener the consumer group listener object + * @return consumer configuration + * @since 2.0 + */ + public ConsumerConfiguration setConsumerEventListener(ConsumerEventListener listener) { + checkNotNull(listener); + this.consumerEventListener = listener; + return this; + } + + /** * @return the configure receiver queue size value */ public int getReceiverQueueSize() { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java new file mode 100644 index 0000000..aeb8bbb --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * Listener on the consumer state changes. + */ +public interface ConsumerEventListener { + + /** + * Notified when the consumer group is changed, and the consumer becomes the active consumer. + */ + void becameActive(Consumer consumer, int partitionId); + + /** + * Notified when the consumer group is changed, and the consumer is still inactive or becomes inactive. + */ + void becameInactive(Consumer consumer, int partitionId); + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 58cdede..43c49b2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -40,6 +40,7 @@ import org.apache.pulsar.client.api.PulsarClientException.TimeoutException; import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.PulsarHandler; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected; @@ -253,6 +254,19 @@ public class ClientCnx extends PulsarHandler { } @Override + protected void handleActiveConsumerChange(CommandActiveConsumerChange change) { + checkArgument(state == State.Ready); + + if (log.isDebugEnabled()) { + log.debug("{} Received a consumer group change message from the server : {}", ctx.channel(), change); + } + ConsumerImpl consumer = consumers.get(change.getConsumerId()); + if (consumer != null) { + consumer.activeConsumerChanged(change.getIsActive()); + } + } + + @Override protected void handleSuccess(CommandSuccess success) { checkArgument(state == State.Ready); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index be5e658..1886c76 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageListener; @@ -54,6 +55,7 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer { protected final String consumerName; protected final CompletableFuture<Consumer> subscribeFuture; protected final MessageListener listener; + protected final ConsumerEventListener consumerEventListener; protected final ExecutorService listenerExecutor; final BlockingQueue<Message> incomingMessages; protected final ConcurrentLinkedQueue<CompletableFuture<Message>> pendingReceives; @@ -68,6 +70,7 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer { this.consumerName = conf.getConsumerName() == null ? ConsumerName.generateRandomName() : conf.getConsumerName(); this.subscribeFuture = subscribeFuture; this.listener = conf.getMessageListener(); + this.consumerEventListener = conf.getConsumerEventListener(); if (receiverQueueSize <= 1) { this.incomingMessages = Queues.newArrayBlockingQueue(1); } else { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index a436899..219479f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -747,6 +747,20 @@ public class ConsumerImpl extends ConsumerBase { } } + void activeConsumerChanged(boolean isActive) { + if (consumerEventListener == null) { + return; + } + + listenerExecutor.submit(() -> { + if (isActive) { + consumerEventListener.becameActive(this, partitionIndex); + } else { + consumerEventListener.becameInactive(this, partitionIndex); + } + }); + } + void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx cnx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(), diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java index 60e3928..92cbea2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java @@ -439,7 +439,9 @@ public class PartitionedConsumerImpl extends ConsumerBase { internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize()); internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType()); internalConsumerConfig.setConsumerName(consumerName); - + if (null != conf.getConsumerEventListener()) { + internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener()); + } int receiverQueueSize = Math.min(conf.getReceiverQueueSize(), conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions); internalConsumerConfig.setReceiverQueueSize(receiverQueueSize); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index f6bd759..029f712 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -251,6 +251,13 @@ public class PulsarClientImpl implements PulsarClient { "Read compacted can only be used with exclusive of failover persistent subscriptions")); } + if (conf.getConsumerEventListener() != null + && conf.getSubscriptionType() != SubscriptionType.Failover) { + return FutureUtil.failedFuture( + new PulsarClientException.InvalidConfigurationException( + "Active consumer listener is only supported for failover subscription")); + } + CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>(); getPartitionedTopicMetadata(topic).thenAccept(metadata -> { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 2912a73..a395233 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -40,6 +40,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandError; import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow; @@ -360,6 +361,19 @@ public class Commands { return res; } + public static ByteBuf newActiveConsumerChange(long consumerId, boolean isActive) { + CommandActiveConsumerChange.Builder changeBuilder = CommandActiveConsumerChange.newBuilder() + .setConsumerId(consumerId) + .setIsActive(isActive); + + CommandActiveConsumerChange change = changeBuilder.build(); + ByteBuf res = serializeWithSize( + BaseCommand.newBuilder().setType(Type.ACTIVE_CONSUMER_CHANGE).setActiveConsumerChange(change)); + changeBuilder.recycle(); + change.recycle(); + return res; + } + public static ByteBuf newSeek(long consumerId, long requestId, long ledgerId, long entryId) { CommandSeek.Builder seekBuilder = CommandSeek.newBuilder(); seekBuilder.setConsumerId(consumerId); @@ -956,4 +970,8 @@ public class Commands { public static boolean peerSupportsGetLastMessageId(int peerVersion) { return peerVersion >= ProtocolVersion.v12.getNumber(); } + + public static boolean peerSupportsActiveConsumerListener(int peerVersion) { + return peerVersion >= ProtocolVersion.v12.getNumber(); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java index 8d2389b..2056199 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java @@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect; @@ -52,7 +53,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe; import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -266,6 +266,11 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter { handleGetLastMessageIdSuccess(cmd.getGetLastMessageIdResponse()); cmd.getGetLastMessageIdResponse().recycle(); break; + + case ACTIVE_CONSUMER_CHANGE: + handleActiveConsumerChange(cmd.getActiveConsumerChange()); + cmd.getActiveConsumerChange().recycle(); + break; } } finally { if (cmdBuilder != null) { @@ -350,6 +355,10 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter { throw new UnsupportedOperationException(); } + protected void handleActiveConsumerChange(CommandActiveConsumerChange change) { + throw new UnsupportedOperationException(); + } + protected void handleSuccess(CommandSuccess success) { throw new UnsupportedOperationException(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 27cc63a..80df9b8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -13624,6 +13624,393 @@ public final class PulsarApi { // @@protoc_insertion_point(class_scope:pulsar.proto.CommandAck) } + public interface CommandActiveConsumerChangeOrBuilder + extends com.google.protobuf.MessageLiteOrBuilder { + + // required uint64 consumer_id = 1; + boolean hasConsumerId(); + long getConsumerId(); + + // optional bool is_active = 2 [default = false]; + boolean hasIsActive(); + boolean getIsActive(); + } + public static final class CommandActiveConsumerChange extends + com.google.protobuf.GeneratedMessageLite + implements CommandActiveConsumerChangeOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage { + // Use CommandActiveConsumerChange.newBuilder() to construct. + private final io.netty.util.Recycler.Handle<CommandActiveConsumerChange> handle; + private CommandActiveConsumerChange(io.netty.util.Recycler.Handle<CommandActiveConsumerChange> handle) { + this.handle = handle; + } + + private static final io.netty.util.Recycler<CommandActiveConsumerChange> RECYCLER = new io.netty.util.Recycler<CommandActiveConsumerChange>() { + protected CommandActiveConsumerChange newObject(Handle<CommandActiveConsumerChange> handle) { + return new CommandActiveConsumerChange(handle); + } + }; + + public void recycle() { + this.initFields(); + this.memoizedIsInitialized = -1; + this.bitField0_ = 0; + this.memoizedSerializedSize = -1; + handle.recycle(this); + } + + private CommandActiveConsumerChange(boolean noInit) { + this.handle = null; + } + + private static final CommandActiveConsumerChange defaultInstance; + public static CommandActiveConsumerChange getDefaultInstance() { + return defaultInstance; + } + + public CommandActiveConsumerChange getDefaultInstanceForType() { + return defaultInstance; + } + + private int bitField0_; + // required uint64 consumer_id = 1; + public static final int CONSUMER_ID_FIELD_NUMBER = 1; + private long consumerId_; + public boolean hasConsumerId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public long getConsumerId() { + return consumerId_; + } + + // optional bool is_active = 2 [default = false]; + public static final int IS_ACTIVE_FIELD_NUMBER = 2; + private boolean isActive_; + public boolean hasIsActive() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public boolean getIsActive() { + return isActive_; + } + + private void initFields() { + consumerId_ = 0L; + isActive_ = false; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasConsumerId()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + throw new RuntimeException("Cannot use CodedOutputStream"); + } + + public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeUInt64(1, consumerId_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBool(2, isActive_); + } + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(1, consumerId_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(2, isActive_); + } + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + throw new RuntimeException("Disabled"); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + throw new RuntimeException("Disabled"); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessageLite.Builder< + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange, Builder> + implements org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChangeOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder { + // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.newBuilder() + private final io.netty.util.Recycler.Handle<Builder> handle; + private Builder(io.netty.util.Recycler.Handle<Builder> handle) { + this.handle = handle; + maybeForceBuilderInitialization(); + } + private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() { + protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) { + return new Builder(handle); + } + }; + + public void recycle() { + clear(); + handle.recycle(this); + } + + private void maybeForceBuilderInitialization() { + } + private static Builder create() { + return RECYCLER.get(); + } + + public Builder clear() { + super.clear(); + consumerId_ = 0L; + bitField0_ = (bitField0_ & ~0x00000001); + isActive_ = false; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange getDefaultInstanceForType() { + return org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance(); + } + + public org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange build() { + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange buildPartial() { + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange result = org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.RECYCLER.get(); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.consumerId_ = consumerId_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.isActive_ = isActive_; + result.bitField0_ = to_bitField0_; + return result; + } + + public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange other) { + if (other == org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance()) return this; + if (other.hasConsumerId()) { + setConsumerId(other.getConsumerId()); + } + if (other.hasIsActive()) { + setIsActive(other.getIsActive()); + } + return this; + } + + public final boolean isInitialized() { + if (!hasConsumerId()) { + + return false; + } + return true; + } + + public Builder mergeFrom(com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + throw new java.io.IOException("Merge from CodedInputStream is disabled"); + } + public Builder mergeFrom( + org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + + return this; + default: { + if (!input.skipField(tag)) { + + return this; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + consumerId_ = input.readUInt64(); + break; + } + case 16: { + bitField0_ |= 0x00000002; + isActive_ = input.readBool(); + break; + } + } + } + } + + private int bitField0_; + + // required uint64 consumer_id = 1; + private long consumerId_ ; + public boolean hasConsumerId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public long getConsumerId() { + return consumerId_; + } + public Builder setConsumerId(long value) { + bitField0_ |= 0x00000001; + consumerId_ = value; + + return this; + } + public Builder clearConsumerId() { + bitField0_ = (bitField0_ & ~0x00000001); + consumerId_ = 0L; + + return this; + } + + // optional bool is_active = 2 [default = false]; + private boolean isActive_ ; + public boolean hasIsActive() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public boolean getIsActive() { + return isActive_; + } + public Builder setIsActive(boolean value) { + bitField0_ |= 0x00000002; + isActive_ = value; + + return this; + } + public Builder clearIsActive() { + bitField0_ = (bitField0_ & ~0x00000002); + isActive_ = false; + + return this; + } + + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandActiveConsumerChange) + } + + static { + defaultInstance = new CommandActiveConsumerChange(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:pulsar.proto.CommandActiveConsumerChange) + } + public interface CommandFlowOrBuilder extends com.google.protobuf.MessageLiteOrBuilder { @@ -21017,6 +21404,10 @@ public final class PulsarApi { // optional .pulsar.proto.CommandGetLastMessageIdResponse getLastMessageIdResponse = 30; boolean hasGetLastMessageIdResponse(); org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse getGetLastMessageIdResponse(); + + // optional .pulsar.proto.CommandActiveConsumerChange active_consumer_change = 31; + boolean hasActiveConsumerChange(); + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange getActiveConsumerChange(); } public static final class BaseCommand extends com.google.protobuf.GeneratedMessageLite @@ -21085,6 +21476,7 @@ public final class PulsarApi { SEEK(26, 28), GET_LAST_MESSAGE_ID(27, 29), GET_LAST_MESSAGE_ID_RESPONSE(28, 30), + ACTIVE_CONSUMER_CHANGE(29, 31), ; public static final int CONNECT_VALUE = 2; @@ -21116,6 +21508,7 @@ public final class PulsarApi { public static final int SEEK_VALUE = 28; public static final int GET_LAST_MESSAGE_ID_VALUE = 29; public static final int GET_LAST_MESSAGE_ID_RESPONSE_VALUE = 30; + public static final int ACTIVE_CONSUMER_CHANGE_VALUE = 31; public final int getNumber() { return value; } @@ -21151,6 +21544,7 @@ public final class PulsarApi { case 28: return SEEK; case 29: return GET_LAST_MESSAGE_ID; case 30: return GET_LAST_MESSAGE_ID_RESPONSE; + case 31: return ACTIVE_CONSUMER_CHANGE; default: return null; } } @@ -21477,6 +21871,16 @@ public final class PulsarApi { return getLastMessageIdResponse_; } + // optional .pulsar.proto.CommandActiveConsumerChange active_consumer_change = 31; + public static final int ACTIVE_CONSUMER_CHANGE_FIELD_NUMBER = 31; + private org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange activeConsumerChange_; + public boolean hasActiveConsumerChange() { + return ((bitField0_ & 0x40000000) == 0x40000000); + } + public org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange getActiveConsumerChange() { + return activeConsumerChange_; + } + private void initFields() { type_ = org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type.CONNECT; connect_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect.getDefaultInstance(); @@ -21508,6 +21912,7 @@ public final class PulsarApi { seek_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek.getDefaultInstance(); getLastMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance(); getLastMessageIdResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance(); + activeConsumerChange_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -21680,6 +22085,12 @@ public final class PulsarApi { return false; } } + if (hasActiveConsumerChange()) { + if (!getActiveConsumerChange().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -21782,6 +22193,9 @@ public final class PulsarApi { if (((bitField0_ & 0x20000000) == 0x20000000)) { output.writeMessage(30, getLastMessageIdResponse_); } + if (((bitField0_ & 0x40000000) == 0x40000000)) { + output.writeMessage(31, activeConsumerChange_); + } } private int memoizedSerializedSize = -1; @@ -21910,6 +22324,10 @@ public final class PulsarApi { size += com.google.protobuf.CodedOutputStream .computeMessageSize(30, getLastMessageIdResponse_); } + if (((bitField0_ & 0x40000000) == 0x40000000)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(31, activeConsumerChange_); + } memoizedSerializedSize = size; return size; } @@ -22083,6 +22501,8 @@ public final class PulsarApi { bitField0_ = (bitField0_ & ~0x10000000); getLastMessageIdResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance(); bitField0_ = (bitField0_ & ~0x20000000); + activeConsumerChange_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance(); + bitField0_ = (bitField0_ & ~0x40000000); return this; } @@ -22236,6 +22656,10 @@ public final class PulsarApi { to_bitField0_ |= 0x20000000; } result.getLastMessageIdResponse_ = getLastMessageIdResponse_; + if (((from_bitField0_ & 0x40000000) == 0x40000000)) { + to_bitField0_ |= 0x40000000; + } + result.activeConsumerChange_ = activeConsumerChange_; result.bitField0_ = to_bitField0_; return result; } @@ -22332,6 +22756,9 @@ public final class PulsarApi { if (other.hasGetLastMessageIdResponse()) { mergeGetLastMessageIdResponse(other.getGetLastMessageIdResponse()); } + if (other.hasActiveConsumerChange()) { + mergeActiveConsumerChange(other.getActiveConsumerChange()); + } return this; } @@ -22502,6 +22929,12 @@ public final class PulsarApi { return false; } } + if (hasActiveConsumerChange()) { + if (!getActiveConsumerChange().isInitialized()) { + + return false; + } + } return true; } @@ -22826,6 +23259,16 @@ public final class PulsarApi { subBuilder.recycle(); break; } + case 250: { + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.newBuilder(); + if (hasActiveConsumerChange()) { + subBuilder.mergeFrom(getActiveConsumerChange()); + } + input.readMessage(subBuilder, extensionRegistry); + setActiveConsumerChange(subBuilder.buildPartial()); + subBuilder.recycle(); + break; + } } } } @@ -24103,6 +24546,49 @@ public final class PulsarApi { return this; } + // optional .pulsar.proto.CommandActiveConsumerChange active_consumer_change = 31; + private org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange activeConsumerChange_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance(); + public boolean hasActiveConsumerChange() { + return ((bitField0_ & 0x40000000) == 0x40000000); + } + public org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange getActiveConsumerChange() { + return activeConsumerChange_; + } + public Builder setActiveConsumerChange(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange value) { + if (value == null) { + throw new NullPointerException(); + } + activeConsumerChange_ = value; + + bitField0_ |= 0x40000000; + return this; + } + public Builder setActiveConsumerChange( + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.Builder builderForValue) { + activeConsumerChange_ = builderForValue.build(); + + bitField0_ |= 0x40000000; + return this; + } + public Builder mergeActiveConsumerChange(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange value) { + if (((bitField0_ & 0x40000000) == 0x40000000) && + activeConsumerChange_ != org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance()) { + activeConsumerChange_ = + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.newBuilder(activeConsumerChange_).mergeFrom(value).buildPartial(); + } else { + activeConsumerChange_ = value; + } + + bitField0_ |= 0x40000000; + return this; + } + public Builder clearActiveConsumerChange() { + activeConsumerChange_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance(); + + bitField0_ = (bitField0_ & ~0x40000000); + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.BaseCommand) } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index ca3d2fb..4a1112d 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -135,7 +135,8 @@ enum ProtocolVersion { v9 = 9; // Added end of topic notification v10 = 10;// Added proxy to broker v11 = 11;// C++ consumers before this version are not correctly handling the checksum field - v12 = 12;//Added get topic's last messageId from broker + v12 = 12;// Added get topic's last messageId from broker + // Added CommandActiveConsumerChange } message CommandConnect { @@ -324,6 +325,12 @@ message CommandAck { repeated KeyLongValue properties = 5; } +// changes on active consumer +message CommandActiveConsumerChange { + required uint64 consumer_id = 1; + optional bool is_active = 2 [default = false]; +} + message CommandFlow { required uint64 consumer_id = 1; @@ -500,6 +507,8 @@ message BaseCommand { GET_LAST_MESSAGE_ID = 29; GET_LAST_MESSAGE_ID_RESPONSE = 30; + + ACTIVE_CONSUMER_CHANGE = 31; } required Type type = 1; @@ -544,5 +553,6 @@ message BaseCommand { optional CommandGetLastMessageId getLastMessageId = 29; optional CommandGetLastMessageIdResponse getLastMessageIdResponse = 30; + optional CommandActiveConsumerChange active_consumer_change = 31; } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/PulsarDecoderTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/PulsarDecoderTest.java new file mode 100644 index 0000000..c57a8c8 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/PulsarDecoderTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.api; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Unit test of {@link PulsarDecoder}. + */ +public class PulsarDecoderTest { + + private PulsarDecoder decoder; + + @BeforeMethod + public void setup() { + this.decoder = mock(PulsarDecoder.class, CALLS_REAL_METHODS); + } + + @Test + public void testChannelRead() throws Exception { + long consumerId = 1234L; + ByteBuf changeBuf = Commands.newActiveConsumerChange(consumerId, true); + ByteBuf cmdBuf = changeBuf.slice(4, changeBuf.writerIndex() - 4); + + doNothing().when(decoder).handleActiveConsumerChange(any(CommandActiveConsumerChange.class)); + decoder.channelRead(mock(ChannelHandlerContext.class), cmdBuf); + + verify(decoder, times(1)) + .handleActiveConsumerChange(any(CommandActiveConsumerChange.class)); + } + + +} -- To stop receiving notification emails like this one, please contact mme...@apache.org.