merlimat closed pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index 5181df177..1b37f8946 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -26,9 +26,9 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.pulsar.broker.service.BrokerServiceException; -import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; +import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.utils.CopyOnWriteArrayList; import org.slf4j.Logger; @@ -72,7 +72,17 @@ public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int part protected abstract boolean isConsumersExceededOnSubscription(); - protected void pickAndScheduleActiveConsumer() { + protected void notifyActiveConsumerChanged(Consumer activeConsumer) { + if (null != activeConsumer && subscriptionType == SubType.Failover) { + consumers.forEach(consumer -> + consumer.notifyActiveConsumerChange(activeConsumer)); + } + } + + /** + * @return the previous active consumer if the consumer is changed, otherwise null. + */ + protected boolean pickAndScheduleActiveConsumer() { checkArgument(!consumers.isEmpty()); consumers.sort((c1, c2) -> c1.consumerName().compareTo(c2.consumerName())); @@ -80,12 +90,15 @@ protected void pickAndScheduleActiveConsumer() { int index = partitionIndex % consumers.size(); Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index)); - if (prevConsumer == ACTIVE_CONSUMER_UPDATER.get(this)) { + Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); + if (prevConsumer == activeConsumer) { // Active consumer did not change. Do nothing at this point - return; + return false; + } else { + // If the active consumer is changed, send notification. + scheduleReadOnActiveConsumer(); + return true; } - - scheduleReadOnActiveConsumer(); } public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { @@ -109,8 +122,17 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce consumers.add(consumer); - // Pick an active consumer and start it - pickAndScheduleActiveConsumer(); + if (!pickAndScheduleActiveConsumer()) { + // the active consumer is not changed + Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this); + if (null == currentActiveConsumer) { + if (log.isDebugEnabled()) { + log.debug("Current active consumer disappears while adding consumer {}", consumer); + } + } else { + consumer.notifyActiveConsumerChange(currentActiveConsumer); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index fd77ef290..0bb30cfb6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -152,6 +152,21 @@ public String consumerName() { return consumerName; } + void notifyActiveConsumerChange(Consumer activeConsumer) { + if (!Commands.peerSupportsActiveConsumerListener(cnx.getRemoteEndpointProtocolVersion())) { + // if the client is older than `v12`, we don't need to send consumer group changes. + return; + } + + if (log.isDebugEnabled()) { + log.debug("notify consumer {} - that [{}] for subscription {} has new active consumer : {}", + consumerId, topicName, subscription.getName(), activeConsumer); + } + cnx.ctx().writeAndFlush( + Commands.newActiveConsumerChange(consumerId, this == activeConsumer), + cnx.ctx().voidPromise()); + } + public boolean readCompacted() { return readCompacted; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 716e3325e..40678f839 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -85,7 +85,10 @@ protected void scheduleReadOnActiveConsumer() { log.debug("[{}] Rewind cursor and read more entries without delay", name); } cursor.rewind(); - readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this)); + + Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); + notifyActiveConsumerChanged(activeConsumer); + readMoreEntries(activeConsumer); return; } @@ -102,7 +105,10 @@ protected void scheduleReadOnActiveConsumer() { serviceConfig.getActiveConsumerFailoverDelayTimeMillis()); } cursor.rewind(); - readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this)); + + Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); + notifyActiveConsumerChanged(activeConsumer); + readMoreEntries(activeConsumer); readOnActiveConsumerTask = null; }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index a68165d5f..922b35bc5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -22,23 +22,29 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.matches; +import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertTrue; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; @@ -57,17 +63,18 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.namespace.NamespaceService; -import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.Consumer; -import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; +import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.apache.zookeeper.ZooKeeper; import org.mockito.invocation.InvocationOnMock; @@ -83,9 +90,12 @@ private BrokerService brokerService; private ManagedLedgerFactory mlFactoryMock; private ServerCnx serverCnx; + private ServerCnx serverCnxWithOldVersion; private ManagedLedger ledgerMock; private ManagedCursor cursorMock; private ConfigurationCacheService configCacheService; + private ChannelHandlerContext channelCtx; + private LinkedBlockingQueue<CommandActiveConsumerChange> consumerChanges; final String successTopicName = "persistent://part-perf/global/perf.t1/ptopic"; final String failTopicName = "persistent://part-perf/global/perf.t1/pfailTopic"; @@ -115,10 +125,50 @@ public void setup() throws Exception { brokerService = spy(new BrokerService(pulsar)); doReturn(brokerService).when(pulsar).getBrokerService(); + consumerChanges = new LinkedBlockingQueue<>(); + this.channelCtx = mock(ChannelHandlerContext.class); + doAnswer(invocationOnMock -> { + ByteBuf buf = invocationOnMock.getArgumentAt(0, ByteBuf.class); + + ByteBuf cmdBuf = buf.retainedSlice(4, buf.writerIndex() - 4); + try { + int cmdSize = (int) cmdBuf.readUnsignedInt(); + int writerIndex = cmdBuf.writerIndex(); + cmdBuf.writerIndex(cmdBuf.readerIndex() + cmdSize); + ByteBufCodedInputStream cmdInputStream = ByteBufCodedInputStream.get(cmdBuf); + + BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder(); + BaseCommand cmd = cmdBuilder.mergeFrom(cmdInputStream, null).build(); + cmdBuilder.recycle(); + cmdBuf.writerIndex(writerIndex); + cmdInputStream.recycle(); + + if (cmd.hasActiveConsumerChange()) { + consumerChanges.put(cmd.getActiveConsumerChange()); + } + cmd.recycle(); + } finally { + cmdBuf.release(); + } + + return null; + }).when(channelCtx).writeAndFlush(any(), any()); + serverCnx = spy(new ServerCnx(brokerService)); doReturn(true).when(serverCnx).isActive(); doReturn(true).when(serverCnx).isWritable(); doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress(); + when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getNumber()); + when(serverCnx.ctx()).thenReturn(channelCtx); + + serverCnxWithOldVersion = spy(new ServerCnx(brokerService)); + doReturn(true).when(serverCnxWithOldVersion).isActive(); + doReturn(true).when(serverCnxWithOldVersion).isWritable(); + doReturn(new InetSocketAddress("localhost", 1234)) + .when(serverCnxWithOldVersion).clientAddress(); + when(serverCnxWithOldVersion.getRemoteEndpointProtocolVersion()) + .thenReturn(ProtocolVersion.v11.getNumber()); + when(serverCnxWithOldVersion.ctx()).thenReturn(channelCtx); NamespaceService nsSvc = mock(NamespaceService.class); doReturn(nsSvc).when(pulsar).getNamespaceService(); @@ -193,6 +243,51 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), anyObject()); } + private void verifyActiveConsumerChange(CommandActiveConsumerChange change, + long consumerId, + boolean isActive) { + assertEquals(consumerId, change.getConsumerId()); + assertEquals(isActive, change.getIsActive()); + change.recycle(); + } + + @Test + public void testConsumerGroupChangesWithOldNewConsumers() throws Exception { + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock); + + int partitionIndex = 0; + PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock, + SubType.Failover, partitionIndex, topic); + + // 1. Verify no consumers connected + assertFalse(pdfc.isConsumerConnected()); + + // 2. Add old consumer + Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, + "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false); + pdfc.addConsumer(consumer1); + List<Consumer> consumers = pdfc.getConsumers(); + assertTrue(consumers.get(0).consumerName() == consumer1.consumerName()); + assertEquals(1, consumers.size()); + assertNull(consumerChanges.poll()); + + verify(channelCtx, times(0)).write(any()); + + // 3. Add new consumer + Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, + "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false); + pdfc.addConsumer(consumer2); + consumers = pdfc.getConsumers(); + assertTrue(consumers.get(0).consumerName() == consumer1.consumerName()); + assertEquals(2, consumers.size()); + + CommandActiveConsumerChange change = consumerChanges.take(); + verifyActiveConsumerChange(change, 2, false); + + verify(channelCtx, times(1)).writeAndFlush(any(), any()); + } + @Test public void testAddRemoveConsumer() throws Exception { log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---"); @@ -208,13 +303,16 @@ public void testAddRemoveConsumer() throws Exception { assertFalse(pdfc.isConsumerConnected()); // 2. Add consumer - Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, + Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */); + false /* read compacted */)); pdfc.addConsumer(consumer1); List<Consumer> consumers = pdfc.getConsumers(); assertTrue(consumers.get(0).consumerName() == consumer1.consumerName()); assertEquals(1, consumers.size()); + CommandActiveConsumerChange change = consumerChanges.take(); + verifyActiveConsumerChange(change, 1, true); + verify(consumer1, times(1)).notifyActiveConsumerChange(same(consumer1)); // 3. Add again, duplicate allowed pdfc.addConsumer(consumer1); @@ -224,31 +322,57 @@ public void testAddRemoveConsumer() throws Exception { // 4. Verify active consumer assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName()); + // get the notified with who is the leader + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 1, true); + verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1)); // 5. Add another consumer which does not change active consumer - Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */); + Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */)); pdfc.addConsumer(consumer2); consumers = pdfc.getConsumers(); assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName()); assertEquals(3, consumers.size()); + // get notified with who is the leader + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 2, false); + verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1)); + verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1)); // 6. Add a consumer which changes active consumer - Consumer consumer0 = new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0, + Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0, "Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */); + false /* read compacted */)); pdfc.addConsumer(consumer0); consumers = pdfc.getConsumers(); assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName()); assertEquals(4, consumers.size()); + // all consumers will receive notifications + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 0, true); + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 1, false); + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 1, false); + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 2, false); + verify(consumer0, times(1)).notifyActiveConsumerChange(same(consumer0)); + verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer1)); + verify(consumer1, times(2)).notifyActiveConsumerChange(same(consumer0)); + verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer1)); + verify(consumer2, times(1)).notifyActiveConsumerChange(same(consumer0)); + // 7. Remove last consumer pdfc.removeConsumer(consumer2); consumers = pdfc.getConsumers(); assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName()); assertEquals(3, consumers.size()); + // not consumer group changes + assertNull(consumerChanges.poll()); - // 8. Verify if we can unsubscribe when more than one consumer is connected + // 8. Verify if we cannot unsubscribe when more than one consumer is connected assertFalse(pdfc.canUnsubscribe(consumer0)); // 9. Remove active consumer @@ -257,6 +381,12 @@ public void testAddRemoveConsumer() throws Exception { assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName()); assertEquals(2, consumers.size()); + // the remaining consumers will receive notifications + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 1, true); + change = consumerChanges.take(); + verifyActiveConsumerChange(change, 1, true); + // 10. Attempt to remove already removed consumer String cause = ""; try { @@ -271,10 +401,11 @@ public void testAddRemoveConsumer() throws Exception { consumers = pdfc.getConsumers(); assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName()); assertEquals(1, consumers.size()); + // not consumer group changes + assertNull(consumerChanges.poll()); // 11. With only one consumer, unsubscribe is allowed assertTrue(pdfc.canUnsubscribe(consumer1)); - } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index c172b01a8..7bc314155 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -24,10 +24,12 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.Sets; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; @@ -35,6 +37,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -42,6 +45,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.util.FutureUtil; @@ -68,22 +72,84 @@ protected void cleanup() throws Exception { private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 2000; + private static class TestConsumerStateEventListener implements ConsumerEventListener { + + final LinkedBlockingQueue<Integer> activeQueue = new LinkedBlockingQueue<>(); + final LinkedBlockingQueue<Integer> inActiveQueue = new LinkedBlockingQueue<>(); + + @Override + public void becameActive(Consumer consumer, int partitionId) { + try { + activeQueue.put(partitionId); + } catch (InterruptedException e) { + } + } + + @Override + public void becameInactive(Consumer consumer, int partitionId) { + try { + inActiveQueue.put(partitionId); + } catch (InterruptedException e) { + } + } + } + + private void verifyConsumerNotReceiveAnyStateChanges(TestConsumerStateEventListener listener) throws Exception { + assertNull(listener.activeQueue.poll()); + assertNull(listener.inActiveQueue.poll()); + } + + private void verifyConsumerActive(TestConsumerStateEventListener listener, int partitionId) throws Exception { + assertEquals(partitionId, listener.activeQueue.take().intValue()); + assertNull(listener.inActiveQueue.poll()); + } + + private void verifyConsumerInactive(TestConsumerStateEventListener listener, int partitionId) throws Exception { + assertEquals(partitionId, listener.inActiveQueue.take().intValue()); + assertNull(listener.activeQueue.poll()); + } + + private static class ActiveInactiveListenerEvent implements ConsumerEventListener { + + private final Set<Integer> activePtns = Sets.newHashSet(); + private final Set<Integer> inactivePtns = Sets.newHashSet(); + + @Override + public synchronized void becameActive(Consumer consumer, int partitionId) { + activePtns.add(partitionId); + inactivePtns.remove(partitionId); + } + + @Override + public synchronized void becameInactive(Consumer consumer, int partitionId) { + activePtns.remove(partitionId); + inactivePtns.add(partitionId); + } + } + @Test public void testSimpleConsumerEventsWithoutPartition() throws Exception { final String topicName = "persistent://prop/use/ns-abc/failover-topic1"; final String subName = "sub1"; final int numMsgs = 100; + TestConsumerStateEventListener listener1 = new TestConsumerStateEventListener(); ConsumerConfiguration consumerConf1 = new ConsumerConfiguration(); consumerConf1.setSubscriptionType(SubscriptionType.Failover); consumerConf1.setConsumerName("1"); + consumerConf1.setConsumerEventListener(listener1); + + TestConsumerStateEventListener listener2 = new TestConsumerStateEventListener(); ConsumerConfiguration consumerConf2 = new ConsumerConfiguration(); consumerConf2.setSubscriptionType(SubscriptionType.Failover); consumerConf2.setConsumerName("2"); + consumerConf2.setConsumerEventListener(listener2); // 1. two consumers on the same subscription Consumer consumer1 = pulsarClient.subscribe(topicName, subName, consumerConf1); Consumer consumer2 = pulsarClient.subscribe(topicName, subName, consumerConf2); + verifyConsumerActive(listener1, -1); + verifyConsumerInactive(listener2, -1); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); PersistentSubscription subRef = topicRef.getSubscription(subName); @@ -147,6 +213,9 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception { } consumer1.close(); Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME); + + verifyConsumerActive(listener2, -1); + verifyConsumerNotReceiveAnyStateChanges(listener1); for (int i = 5; i < numMsgs; i++) { msg = consumer2.receive(1, TimeUnit.SECONDS); Assert.assertNotNull(msg); @@ -195,9 +264,11 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception { futures.clear(); // 7. consumer subscription should not send messages to the new consumer if its name is not highest in the list + TestConsumerStateEventListener listener3 = new TestConsumerStateEventListener(); ConsumerConfiguration consumerConf3 = new ConsumerConfiguration(); consumerConf3.setSubscriptionType(SubscriptionType.Failover); consumerConf3.setConsumerName("3"); + consumerConf3.setConsumerEventListener(listener3); for (int i = 0; i < 5; i++) { msg = consumer1.receive(1, TimeUnit.SECONDS); Assert.assertNotNull(msg); @@ -206,6 +277,9 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception { } Consumer consumer3 = pulsarClient.subscribe(topicName, subName, consumerConf3); Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME); + + verifyConsumerInactive(listener3, -1); + Assert.assertNull(consumer3.receive(1, TimeUnit.SECONDS)); for (int i = 5; i < numMsgs; i++) { msg = consumer1.receive(1, TimeUnit.SECONDS); @@ -247,7 +321,7 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception { admin.persistentTopics().delete(topicName); } - @Test(enabled = false) + @Test public void testSimpleConsumerEventsWithPartition() throws Exception { int numPartitions = 4; @@ -261,12 +335,18 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { ProducerConfiguration producerConf = new ProducerConfiguration(); producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); + + ActiveInactiveListenerEvent listener1 = new ActiveInactiveListenerEvent(); ConsumerConfiguration consumerConf1 = new ConsumerConfiguration(); consumerConf1.setSubscriptionType(SubscriptionType.Failover); consumerConf1.setConsumerName("1"); + consumerConf1.setConsumerEventListener(listener1); + + ActiveInactiveListenerEvent listener2 = new ActiveInactiveListenerEvent(); ConsumerConfiguration consumerConf2 = new ConsumerConfiguration(); consumerConf2.setSubscriptionType(SubscriptionType.Failover); consumerConf2.setConsumerName("2"); + consumerConf2.setConsumerEventListener(listener2); // 1. two consumers on the same subscription Consumer consumer1 = pulsarClient.subscribe(topicName, subName, consumerConf1); @@ -298,6 +378,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { // equal distribution between both consumers int totalMessages = 0; Message msg = null; + Set<Integer> receivedPtns = Sets.newHashSet(); while (true) { msg = consumer1.receive(1, TimeUnit.SECONDS); if (msg == null) { @@ -305,8 +386,16 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { } totalMessages++; consumer1.acknowledge(msg); + MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); + receivedPtns.add(msgId.getPartitionIndex()); } + + assertTrue(Sets.difference(listener1.activePtns, receivedPtns).isEmpty()); + assertTrue(Sets.difference(listener2.inactivePtns, receivedPtns).isEmpty()); + Assert.assertEquals(totalMessages, numMsgs / 2); + + receivedPtns = Sets.newHashSet(); while (true) { msg = consumer2.receive(1, TimeUnit.SECONDS); if (msg == null) { @@ -314,7 +403,12 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { } totalMessages++; consumer2.acknowledge(msg); + MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); + receivedPtns.add(msgId.getPartitionIndex()); } + assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty()); + assertTrue(Sets.difference(listener2.activePtns, receivedPtns).isEmpty()); + Assert.assertEquals(totalMessages, numMsgs); Assert.assertEquals(disp0.getActiveConsumer().consumerName(), consumerConf1.getConsumerName()); Assert.assertEquals(disp1.getActiveConsumer().consumerName(), consumerConf2.getConsumerName()); diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index fc773fedf..018f4ab88 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -921,6 +921,13 @@ void ClientConnection::handleIncomingCommand() { break; } + case BaseCommand::ACTIVE_CONSUMER_CHANGE: { + LOG_DEBUG(cnxString_ << "Received notification about active consumer changes"); + // ignore this message for now. + // TODO: @link{https://github.com/apache/incubator-pulsar/issues/1240} + break; + } + default: { LOG_WARN(cnxString_ << "Received invalid message from server"); close(); diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 70d61dfa1..e0245c555 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -381,6 +381,9 @@ std::string Commands::messageType(BaseCommand_Type type) { case BaseCommand::SEEK: return "SEEK"; break; + case BaseCommand::ACTIVE_CONSUMER_CHANGE: + return "ACTIVE_CONSUMER_CHANGE"; + break; }; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java index 76928bd1c..00e4537ad 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java @@ -46,6 +46,8 @@ private MessageListener messageListener; + private ConsumerEventListener consumerEventListener; + private int receiverQueueSize = 1000; private int maxTotalReceiverQueueSizeAcrossPartitions = 50000; @@ -131,6 +133,33 @@ public ConsumerConfiguration setMessageListener(MessageListener messageListener) return this; } + /** + * @return this configured {@link ConsumerEventListener} for the consumer. + * @see #setConsumerEventListener(ConsumerEventListener) + * @since 2.0 + */ + public ConsumerEventListener getConsumerEventListener() { + return this.consumerEventListener; + } + + /** + * Sets a {@link ConsumerEventListener} for the consumer. + * + * <p>The consumer group listener is used for receiving consumer state change in a consumer group for failover + * subscription. Application can then react to the consumer state changes. + * + * <p>This change is experimental. It is subject to changes coming in release 2.0. + * + * @param listener the consumer group listener object + * @return consumer configuration + * @since 2.0 + */ + public ConsumerConfiguration setConsumerEventListener(ConsumerEventListener listener) { + checkNotNull(listener); + this.consumerEventListener = listener; + return this; + } + /** * @return the configure receiver queue size value */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java new file mode 100644 index 000000000..aeb8bbbb5 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * Listener on the consumer state changes. + */ +public interface ConsumerEventListener { + + /** + * Notified when the consumer group is changed, and the consumer becomes the active consumer. + */ + void becameActive(Consumer consumer, int partitionId); + + /** + * Notified when the consumer group is changed, and the consumer is still inactive or becomes inactive. + */ + void becameInactive(Consumer consumer, int partitionId); + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 58cdedeb6..43c49b2a9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -40,6 +40,7 @@ import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.PulsarHandler; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected; @@ -252,6 +253,19 @@ protected void handleMessage(CommandMessage cmdMessage, ByteBuf headersAndPayloa } } + @Override + protected void handleActiveConsumerChange(CommandActiveConsumerChange change) { + checkArgument(state == State.Ready); + + if (log.isDebugEnabled()) { + log.debug("{} Received a consumer group change message from the server : {}", ctx.channel(), change); + } + ConsumerImpl consumer = consumers.get(change.getConsumerId()); + if (consumer != null) { + consumer.activeConsumerChanged(change.getIsActive()); + } + } + @Override protected void handleSuccess(CommandSuccess success) { checkArgument(state == State.Ready); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index be5e6584b..1886c7683 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageListener; @@ -54,6 +55,7 @@ protected final String consumerName; protected final CompletableFuture<Consumer> subscribeFuture; protected final MessageListener listener; + protected final ConsumerEventListener consumerEventListener; protected final ExecutorService listenerExecutor; final BlockingQueue<Message> incomingMessages; protected final ConcurrentLinkedQueue<CompletableFuture<Message>> pendingReceives; @@ -68,6 +70,7 @@ protected ConsumerBase(PulsarClientImpl client, String topic, String subscriptio this.consumerName = conf.getConsumerName() == null ? ConsumerName.generateRandomName() : conf.getConsumerName(); this.subscribeFuture = subscribeFuture; this.listener = conf.getMessageListener(); + this.consumerEventListener = conf.getConsumerEventListener(); if (receiverQueueSize <= 1) { this.incomingMessages = Queues.newArrayBlockingQueue(1); } else { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index a43689971..219479f53 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -747,6 +747,20 @@ private void failPendingReceive() { } } + void activeConsumerChanged(boolean isActive) { + if (consumerEventListener == null) { + return; + } + + listenerExecutor.submit(() -> { + if (isActive) { + consumerEventListener.becameActive(this, partitionIndex); + } else { + consumerEventListener.becameInactive(this, partitionIndex); + } + }); + } + void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx cnx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(), diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java index 60e392804..92cbea2e5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java @@ -439,7 +439,9 @@ private ConsumerConfiguration getInternalConsumerConfig() { internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize()); internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType()); internalConsumerConfig.setConsumerName(consumerName); - + if (null != conf.getConsumerEventListener()) { + internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener()); + } int receiverQueueSize = Math.min(conf.getReceiverQueueSize(), conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions); internalConsumerConfig.setReceiverQueueSize(receiverQueueSize); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index f6bd759bb..029f712f3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -251,6 +251,13 @@ public Consumer subscribe(String topic, String subscription, ConsumerConfigurati "Read compacted can only be used with exclusive of failover persistent subscriptions")); } + if (conf.getConsumerEventListener() != null + && conf.getSubscriptionType() != SubscriptionType.Failover) { + return FutureUtil.failedFuture( + new PulsarClientException.InvalidConfigurationException( + "Active consumer listener is only supported for failover subscription")); + } + CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>(); getPartitionedTopicMetadata(topic).thenAccept(metadata -> { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 2912a737a..a395233cc 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -40,6 +40,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandError; import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow; @@ -360,6 +361,19 @@ public static ByteBuf newUnsubscribe(long consumerId, long requestId) { return res; } + public static ByteBuf newActiveConsumerChange(long consumerId, boolean isActive) { + CommandActiveConsumerChange.Builder changeBuilder = CommandActiveConsumerChange.newBuilder() + .setConsumerId(consumerId) + .setIsActive(isActive); + + CommandActiveConsumerChange change = changeBuilder.build(); + ByteBuf res = serializeWithSize( + BaseCommand.newBuilder().setType(Type.ACTIVE_CONSUMER_CHANGE).setActiveConsumerChange(change)); + changeBuilder.recycle(); + change.recycle(); + return res; + } + public static ByteBuf newSeek(long consumerId, long requestId, long ledgerId, long entryId) { CommandSeek.Builder seekBuilder = CommandSeek.newBuilder(); seekBuilder.setConsumerId(consumerId); @@ -956,4 +970,8 @@ public static ByteBuf newLookup(String topic, boolean authoritative, String orig public static boolean peerSupportsGetLastMessageId(int peerVersion) { return peerVersion >= ProtocolVersion.v12.getNumber(); } + + public static boolean peerSupportsActiveConsumerListener(int peerVersion) { + return peerVersion >= ProtocolVersion.v12.getNumber(); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java index 8d2389bf6..205619983 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java @@ -23,6 +23,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect; @@ -52,7 +53,6 @@ import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -266,6 +266,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception handleGetLastMessageIdSuccess(cmd.getGetLastMessageIdResponse()); cmd.getGetLastMessageIdResponse().recycle(); break; + + case ACTIVE_CONSUMER_CHANGE: + handleActiveConsumerChange(cmd.getActiveConsumerChange()); + cmd.getActiveConsumerChange().recycle(); + break; } } finally { if (cmdBuilder != null) { @@ -350,6 +355,10 @@ protected void handleSeek(CommandSeek seek) { throw new UnsupportedOperationException(); } + protected void handleActiveConsumerChange(CommandActiveConsumerChange change) { + throw new UnsupportedOperationException(); + } + protected void handleSuccess(CommandSuccess success) { throw new UnsupportedOperationException(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 27cc63ae6..80df9b821 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -13624,6 +13624,393 @@ public Builder removeProperties(int index) { // @@protoc_insertion_point(class_scope:pulsar.proto.CommandAck) } + public interface CommandActiveConsumerChangeOrBuilder + extends com.google.protobuf.MessageLiteOrBuilder { + + // required uint64 consumer_id = 1; + boolean hasConsumerId(); + long getConsumerId(); + + // optional bool is_active = 2 [default = false]; + boolean hasIsActive(); + boolean getIsActive(); + } + public static final class CommandActiveConsumerChange extends + com.google.protobuf.GeneratedMessageLite + implements CommandActiveConsumerChangeOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage { + // Use CommandActiveConsumerChange.newBuilder() to construct. + private final io.netty.util.Recycler.Handle<CommandActiveConsumerChange> handle; + private CommandActiveConsumerChange(io.netty.util.Recycler.Handle<CommandActiveConsumerChange> handle) { + this.handle = handle; + } + + private static final io.netty.util.Recycler<CommandActiveConsumerChange> RECYCLER = new io.netty.util.Recycler<CommandActiveConsumerChange>() { + protected CommandActiveConsumerChange newObject(Handle<CommandActiveConsumerChange> handle) { + return new CommandActiveConsumerChange(handle); + } + }; + + public void recycle() { + this.initFields(); + this.memoizedIsInitialized = -1; + this.bitField0_ = 0; + this.memoizedSerializedSize = -1; + handle.recycle(this); + } + + private CommandActiveConsumerChange(boolean noInit) { + this.handle = null; + } + + private static final CommandActiveConsumerChange defaultInstance; + public static CommandActiveConsumerChange getDefaultInstance() { + return defaultInstance; + } + + public CommandActiveConsumerChange getDefaultInstanceForType() { + return defaultInstance; + } + + private int bitField0_; + // required uint64 consumer_id = 1; + public static final int CONSUMER_ID_FIELD_NUMBER = 1; + private long consumerId_; + public boolean hasConsumerId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public long getConsumerId() { + return consumerId_; + } + + // optional bool is_active = 2 [default = false]; + public static final int IS_ACTIVE_FIELD_NUMBER = 2; + private boolean isActive_; + public boolean hasIsActive() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public boolean getIsActive() { + return isActive_; + } + + private void initFields() { + consumerId_ = 0L; + isActive_ = false; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasConsumerId()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + throw new RuntimeException("Cannot use CodedOutputStream"); + } + + public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeUInt64(1, consumerId_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBool(2, isActive_); + } + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(1, consumerId_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(2, isActive_); + } + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + throw new RuntimeException("Disabled"); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + throw new RuntimeException("Disabled"); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessageLite.Builder< + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange, Builder> + implements org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChangeOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder { + // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.newBuilder() + private final io.netty.util.Recycler.Handle<Builder> handle; + private Builder(io.netty.util.Recycler.Handle<Builder> handle) { + this.handle = handle; + maybeForceBuilderInitialization(); + } + private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() { + protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) { + return new Builder(handle); + } + }; + + public void recycle() { + clear(); + handle.recycle(this); + } + + private void maybeForceBuilderInitialization() { + } + private static Builder create() { + return RECYCLER.get(); + } + + public Builder clear() { + super.clear(); + consumerId_ = 0L; + bitField0_ = (bitField0_ & ~0x00000001); + isActive_ = false; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange getDefaultInstanceForType() { + return org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance(); + } + + public org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange build() { + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange buildPartial() { + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange result = org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.RECYCLER.get(); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.consumerId_ = consumerId_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.isActive_ = isActive_; + result.bitField0_ = to_bitField0_; + return result; + } + + public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange other) { + if (other == org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance()) return this; + if (other.hasConsumerId()) { + setConsumerId(other.getConsumerId()); + } + if (other.hasIsActive()) { + setIsActive(other.getIsActive()); + } + return this; + } + + public final boolean isInitialized() { + if (!hasConsumerId()) { + + return false; + } + return true; + } + + public Builder mergeFrom(com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + throw new java.io.IOException("Merge from CodedInputStream is disabled"); + } + public Builder mergeFrom( + org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + + return this; + default: { + if (!input.skipField(tag)) { + + return this; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + consumerId_ = input.readUInt64(); + break; + } + case 16: { + bitField0_ |= 0x00000002; + isActive_ = input.readBool(); + break; + } + } + } + } + + private int bitField0_; + + // required uint64 consumer_id = 1; + private long consumerId_ ; + public boolean hasConsumerId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public long getConsumerId() { + return consumerId_; + } + public Builder setConsumerId(long value) { + bitField0_ |= 0x00000001; + consumerId_ = value; + + return this; + } + public Builder clearConsumerId() { + bitField0_ = (bitField0_ & ~0x00000001); + consumerId_ = 0L; + + return this; + } + + // optional bool is_active = 2 [default = false]; + private boolean isActive_ ; + public boolean hasIsActive() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public boolean getIsActive() { + return isActive_; + } + public Builder setIsActive(boolean value) { + bitField0_ |= 0x00000002; + isActive_ = value; + + return this; + } + public Builder clearIsActive() { + bitField0_ = (bitField0_ & ~0x00000002); + isActive_ = false; + + return this; + } + + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandActiveConsumerChange) + } + + static { + defaultInstance = new CommandActiveConsumerChange(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:pulsar.proto.CommandActiveConsumerChange) + } + public interface CommandFlowOrBuilder extends com.google.protobuf.MessageLiteOrBuilder { @@ -21017,6 +21404,10 @@ public Builder clearRequestId() { // optional .pulsar.proto.CommandGetLastMessageIdResponse getLastMessageIdResponse = 30; boolean hasGetLastMessageIdResponse(); org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse getGetLastMessageIdResponse(); + + // optional .pulsar.proto.CommandActiveConsumerChange active_consumer_change = 31; + boolean hasActiveConsumerChange(); + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange getActiveConsumerChange(); } public static final class BaseCommand extends com.google.protobuf.GeneratedMessageLite @@ -21085,6 +21476,7 @@ public BaseCommand getDefaultInstanceForType() { SEEK(26, 28), GET_LAST_MESSAGE_ID(27, 29), GET_LAST_MESSAGE_ID_RESPONSE(28, 30), + ACTIVE_CONSUMER_CHANGE(29, 31), ; public static final int CONNECT_VALUE = 2; @@ -21116,6 +21508,7 @@ public BaseCommand getDefaultInstanceForType() { public static final int SEEK_VALUE = 28; public static final int GET_LAST_MESSAGE_ID_VALUE = 29; public static final int GET_LAST_MESSAGE_ID_RESPONSE_VALUE = 30; + public static final int ACTIVE_CONSUMER_CHANGE_VALUE = 31; public final int getNumber() { return value; } @@ -21151,6 +21544,7 @@ public static Type valueOf(int value) { case 28: return SEEK; case 29: return GET_LAST_MESSAGE_ID; case 30: return GET_LAST_MESSAGE_ID_RESPONSE; + case 31: return ACTIVE_CONSUMER_CHANGE; default: return null; } } @@ -21477,6 +21871,16 @@ public boolean hasGetLastMessageIdResponse() { return getLastMessageIdResponse_; } + // optional .pulsar.proto.CommandActiveConsumerChange active_consumer_change = 31; + public static final int ACTIVE_CONSUMER_CHANGE_FIELD_NUMBER = 31; + private org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange activeConsumerChange_; + public boolean hasActiveConsumerChange() { + return ((bitField0_ & 0x40000000) == 0x40000000); + } + public org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange getActiveConsumerChange() { + return activeConsumerChange_; + } + private void initFields() { type_ = org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type.CONNECT; connect_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect.getDefaultInstance(); @@ -21508,6 +21912,7 @@ private void initFields() { seek_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek.getDefaultInstance(); getLastMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance(); getLastMessageIdResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance(); + activeConsumerChange_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -21680,6 +22085,12 @@ public final boolean isInitialized() { return false; } } + if (hasActiveConsumerChange()) { + if (!getActiveConsumerChange().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -21782,6 +22193,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x20000000) == 0x20000000)) { output.writeMessage(30, getLastMessageIdResponse_); } + if (((bitField0_ & 0x40000000) == 0x40000000)) { + output.writeMessage(31, activeConsumerChange_); + } } private int memoizedSerializedSize = -1; @@ -21910,6 +22324,10 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeMessageSize(30, getLastMessageIdResponse_); } + if (((bitField0_ & 0x40000000) == 0x40000000)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(31, activeConsumerChange_); + } memoizedSerializedSize = size; return size; } @@ -22083,6 +22501,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x10000000); getLastMessageIdResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance(); bitField0_ = (bitField0_ & ~0x20000000); + activeConsumerChange_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance(); + bitField0_ = (bitField0_ & ~0x40000000); return this; } @@ -22236,6 +22656,10 @@ public Builder clone() { to_bitField0_ |= 0x20000000; } result.getLastMessageIdResponse_ = getLastMessageIdResponse_; + if (((from_bitField0_ & 0x40000000) == 0x40000000)) { + to_bitField0_ |= 0x40000000; + } + result.activeConsumerChange_ = activeConsumerChange_; result.bitField0_ = to_bitField0_; return result; } @@ -22332,6 +22756,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.BaseComman if (other.hasGetLastMessageIdResponse()) { mergeGetLastMessageIdResponse(other.getGetLastMessageIdResponse()); } + if (other.hasActiveConsumerChange()) { + mergeActiveConsumerChange(other.getActiveConsumerChange()); + } return this; } @@ -22502,6 +22929,12 @@ public final boolean isInitialized() { return false; } } + if (hasActiveConsumerChange()) { + if (!getActiveConsumerChange().isInitialized()) { + + return false; + } + } return true; } @@ -22826,6 +23259,16 @@ public Builder mergeFrom( subBuilder.recycle(); break; } + case 250: { + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.newBuilder(); + if (hasActiveConsumerChange()) { + subBuilder.mergeFrom(getActiveConsumerChange()); + } + input.readMessage(subBuilder, extensionRegistry); + setActiveConsumerChange(subBuilder.buildPartial()); + subBuilder.recycle(); + break; + } } } } @@ -24103,6 +24546,49 @@ public Builder clearGetLastMessageIdResponse() { return this; } + // optional .pulsar.proto.CommandActiveConsumerChange active_consumer_change = 31; + private org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange activeConsumerChange_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance(); + public boolean hasActiveConsumerChange() { + return ((bitField0_ & 0x40000000) == 0x40000000); + } + public org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange getActiveConsumerChange() { + return activeConsumerChange_; + } + public Builder setActiveConsumerChange(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange value) { + if (value == null) { + throw new NullPointerException(); + } + activeConsumerChange_ = value; + + bitField0_ |= 0x40000000; + return this; + } + public Builder setActiveConsumerChange( + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.Builder builderForValue) { + activeConsumerChange_ = builderForValue.build(); + + bitField0_ |= 0x40000000; + return this; + } + public Builder mergeActiveConsumerChange(org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange value) { + if (((bitField0_ & 0x40000000) == 0x40000000) && + activeConsumerChange_ != org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance()) { + activeConsumerChange_ = + org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.newBuilder(activeConsumerChange_).mergeFrom(value).buildPartial(); + } else { + activeConsumerChange_ = value; + } + + bitField0_ |= 0x40000000; + return this; + } + public Builder clearActiveConsumerChange() { + activeConsumerChange_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance(); + + bitField0_ = (bitField0_ & ~0x40000000); + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.BaseCommand) } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index ca3d2fb3d..4a1112d0a 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -135,7 +135,8 @@ enum ProtocolVersion { v9 = 9; // Added end of topic notification v10 = 10;// Added proxy to broker v11 = 11;// C++ consumers before this version are not correctly handling the checksum field - v12 = 12;//Added get topic's last messageId from broker + v12 = 12;// Added get topic's last messageId from broker + // Added CommandActiveConsumerChange } message CommandConnect { @@ -324,6 +325,12 @@ message CommandAck { repeated KeyLongValue properties = 5; } +// changes on active consumer +message CommandActiveConsumerChange { + required uint64 consumer_id = 1; + optional bool is_active = 2 [default = false]; +} + message CommandFlow { required uint64 consumer_id = 1; @@ -500,6 +507,8 @@ message BaseCommand { GET_LAST_MESSAGE_ID = 29; GET_LAST_MESSAGE_ID_RESPONSE = 30; + + ACTIVE_CONSUMER_CHANGE = 31; } required Type type = 1; @@ -544,5 +553,6 @@ message BaseCommand { optional CommandGetLastMessageId getLastMessageId = 29; optional CommandGetLastMessageIdResponse getLastMessageIdResponse = 30; + optional CommandActiveConsumerChange active_consumer_change = 31; } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/PulsarDecoderTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/PulsarDecoderTest.java new file mode 100644 index 000000000..c57a8c873 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/PulsarDecoderTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.api; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Unit test of {@link PulsarDecoder}. + */ +public class PulsarDecoderTest { + + private PulsarDecoder decoder; + + @BeforeMethod + public void setup() { + this.decoder = mock(PulsarDecoder.class, CALLS_REAL_METHODS); + } + + @Test + public void testChannelRead() throws Exception { + long consumerId = 1234L; + ByteBuf changeBuf = Commands.newActiveConsumerChange(consumerId, true); + ByteBuf cmdBuf = changeBuf.slice(4, changeBuf.writerIndex() - 4); + + doNothing().when(decoder).handleActiveConsumerChange(any(CommandActiveConsumerChange.class)); + decoder.channelRead(mock(ChannelHandlerContext.class), cmdBuf); + + verify(decoder, times(1)) + .handleActiveConsumerChange(any(CommandActiveConsumerChange.class)); + } + + +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services