This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit dac1c6f39d17b4f64c5dcc8ccd342fc88ecbc2af Author: Xiaoyu Hou <anonhx...@gmail.com> AuthorDate: Mon Jun 6 10:59:55 2022 +0800 [fix][broker]Fast return if ack cumulative illegal (#15695) (cherry picked from commit 02dca31a78523a7d061ac1d6702ff6600a7f4ec4) --- .../org/apache/pulsar/broker/service/Consumer.java | 2 + .../broker/service/MessageCumulativeAckTest.java | 199 +++++++++++++++++++++ 2 files changed, 201 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index f7ed211fb2c..031574975d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -358,11 +358,13 @@ public class Consumer { if (ack.getAckType() == AckType.Cumulative) { if (ack.getMessageIdsCount() != 1) { log.warn("[{}] [{}] Received multi-message ack", subscription, consumerId); + return CompletableFuture.completedFuture(null); } if (Subscription.isIndividualAckMode(subType)) { log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring", subscription, consumerId); + return CompletableFuture.completedFuture(null); } PositionImpl position = PositionImpl.earliest; if (ack.getMessageIdsCount() == 1) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java new file mode 100644 index 00000000000..86754efc0c2 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static java.util.Collections.emptyMap; +import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; +import static org.apache.pulsar.common.api.proto.CommandAck.AckType.Cumulative; +import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Exclusive; +import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Failover; +import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Key_Shared; +import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Shared; +import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import java.net.InetSocketAddress; +import java.util.Optional; +import org.apache.bookkeeper.common.util.OrderedExecutor; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.transaction.TransactionTestBase; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.apache.pulsar.common.api.proto.ProtocolVersion; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class MessageCumulativeAckTest { + private final int consumerId = 1; + private BrokerService brokerService; + private ServerCnx serverCnx; + private MetadataStore store; + protected PulsarService pulsar; + private OrderedExecutor executor; + private EventLoopGroup eventLoopGroup; + private PersistentSubscription sub; + + @BeforeMethod + public void setup() throws Exception { + executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-cumulative-ack-test").build(); + ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); + svcConfig.setBrokerShutdownTimeoutMs(0L); + svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); + svcConfig.setClusterName("pulsar-cluster"); + pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); + doReturn(svcConfig).when(pulsar).getConfiguration(); + + ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class); + doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); + doReturn(TransactionTestBase.createMockBookKeeper(executor)) + .when(pulsar).getBookKeeperClient(); + + store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build()); + doReturn(store).when(pulsar).getLocalMetadataStore(); + doReturn(store).when(pulsar).getConfigurationMetadataStore(); + + PulsarResources pulsarResources = new PulsarResources(store, store); + doReturn(pulsarResources).when(pulsar).getPulsarResources(); + + serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar); + doReturn(true).when(serverCnx).isActive(); + doReturn(true).when(serverCnx).isWritable(); + doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress(); + when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getValue()); + when(serverCnx.ctx()).thenReturn(mock(ChannelHandlerContext.class)); + doReturn(new PulsarCommandSenderImpl(null, serverCnx)) + .when(serverCnx).getCommandSender(); + + eventLoopGroup = new NioEventLoopGroup(); + brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup); + doReturn(brokerService).when(pulsar).getBrokerService(); + + String topicName = TopicName.get("MessageCumulativeAckTest").toString(); + PersistentTopic persistentTopic = new PersistentTopic(topicName, mock(ManagedLedger.class), brokerService); + sub = spy(new PersistentSubscription(persistentTopic, "sub-1", + mock(ManagedCursorImpl.class), false)); + doNothing().when(sub).acknowledgeMessage(any(), any(), any()); + } + + @AfterMethod(alwaysRun = true) + public void shutdown() throws Exception { + if (brokerService != null) { + brokerService.close(); + brokerService = null; + } + if (pulsar != null) { + pulsar.close(); + pulsar = null; + } + + executor.shutdown(); + if (eventLoopGroup != null) { + eventLoopGroup.shutdownGracefully().get(); + } + store.close(); + sub = null; + } + + @DataProvider(name = "individualAckModes") + public static Object[][] individualAckModes() { + return new Object[][]{ + {Shared}, + {Key_Shared}, + }; + } + + @DataProvider(name = "notIndividualAckModes") + public static Object[][] notIndividualAckModes() { + return new Object[][]{ + {Exclusive}, + {Failover}, + }; + } + + @Test(timeOut = 5000, dataProvider = "individualAckModes") + public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throws Exception { + Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0, + "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null, + MessageId.latest, DEFAULT_CONSUMER_EPOCH); + + CommandAck commandAck = new CommandAck(); + commandAck.setAckType(Cumulative); + commandAck.setConsumerId(consumerId); + commandAck.addMessageId().setEntryId(0L).setLedgerId(1L); + + consumer.messageAcked(commandAck).get(); + verify(sub, never()).acknowledgeMessage(any(), any(), any()); + } + + @Test(timeOut = 5000, dataProvider = "notIndividualAckModes") + public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) throws Exception { + Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0, + "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null, + MessageId.latest, DEFAULT_CONSUMER_EPOCH); + + CommandAck commandAck = new CommandAck(); + commandAck.setAckType(Cumulative); + commandAck.setConsumerId(consumerId); + commandAck.addMessageId().setEntryId(0L).setLedgerId(1L); + + consumer.messageAcked(commandAck).get(); + verify(sub, times(1)).acknowledgeMessage(any(), any(), any()); + } + + @Test(timeOut = 5000) + public void testAckWithMoreThanNoneMessageIds() throws Exception { + Consumer consumer = new Consumer(sub, Failover, "topic-1", consumerId, 0, + "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null, + MessageId.latest, DEFAULT_CONSUMER_EPOCH); + + CommandAck commandAck = new CommandAck(); + commandAck.setAckType(Cumulative); + commandAck.setConsumerId(consumerId); + commandAck.addMessageId().setEntryId(0L).setLedgerId(1L); + commandAck.addMessageId().setEntryId(0L).setLedgerId(2L); + + consumer.messageAcked(commandAck).get(); + verify(sub, never()).acknowledgeMessage(any(), any(), any()); + } +}