This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 41d7cf1dab225793c407a5604f1531c5d2956375 Author: fengyubiao <[email protected]> AuthorDate: Thu Jul 28 09:45:11 2022 +0800 [fix][flaky-test] Fix ClassCastException: BrokerService cannot be cast to class PulsarResources (#16821) (cherry picked from commit 4ee346693df8fc8314f94d53b00283f1c6079dc1) --- .../pulsar/broker/PulsarServiceMockSupport.java | 75 ++++++++++++++++++++++ .../broker/service/MessageCumulativeAckTest.java | 9 ++- .../PersistentDispatcherFailoverConsumerTest.java | 29 ++++++--- .../pulsar/broker/service/PersistentTopicTest.java | 31 +++++---- .../broker/service/ServerCnxAuthorizationTest.java | 16 +++-- .../metadata/impl/AbstractMetadataStore.java | 3 +- 6 files changed, 135 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java new file mode 100644 index 00000000000..e871919d882 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import static org.mockito.Mockito.mock; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.impl.AbstractMetadataStore; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +@Slf4j +public class PulsarServiceMockSupport { + + /** + * see: https://github.com/apache/pulsar/pull/16821. + * While executing "doReturn(pulsarResources).when(pulsar).getPulsarResources()", Meta Store Thread also accesses + * variable PulsarService.getPulsarResources() asynchronously in logic: "notification by zk-watcher". + * So execute mock-cmd in meta-thread (The executor of MetaStore is a single thread pool executor, so all things + * will be thread safety). + * Note: If the MetaStore's executor is no longer single-threaded, should mock as single-threaded if you need to + * execute this method. + */ + public static void mockPulsarServiceProps(final PulsarService pulsarService, Runnable mockTask) + throws ExecutionException, InterruptedException, TimeoutException { + final CompletableFuture<Void> mockGetPulsarResourceFuture = new CompletableFuture<>(); + MetadataStoreExtended metadataStoreExtended = pulsarService.getLocalMetadataStore(); + if (metadataStoreExtended instanceof AbstractMetadataStore){ + AbstractMetadataStore abstractMetadataStore = (AbstractMetadataStore) metadataStoreExtended; + abstractMetadataStore.execute(() -> { + mockTask.run(); + mockGetPulsarResourceFuture.complete(null); + }, mock(CompletableFuture.class)); + try { + mockGetPulsarResourceFuture.get(1, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException){ + mockTask.run(); + } + } else { + mockTask.run(); + } + } + + @Test + public void testMockMetaStore() throws Exception{ + AtomicInteger integer = new AtomicInteger(); + PulsarService pulsarService = Mockito.mock(PulsarService.class); + Mockito.when(pulsarService.getLocalMetadataStore()).thenReturn(Mockito.mock(ZKMetadataStore.class)); + mockPulsarServiceProps(pulsarService, () -> integer.incrementAndGet()); + Assert.assertEquals(integer.get(), 1); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java index 2b67a58e43d..aada2066556 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java @@ -45,6 +45,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.PulsarServiceMockSupport; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -94,7 +95,9 @@ public class MessageCumulativeAckTest { doReturn(store).when(pulsar).getConfigurationMetadataStore(); PulsarResources pulsarResources = new PulsarResources(store, store); - doReturn(pulsarResources).when(pulsar).getPulsarResources(); + PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { + doReturn(pulsarResources).when(pulsar).getPulsarResources(); + }); serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar); doReturn(true).when(serverCnx).isActive(); @@ -107,7 +110,9 @@ public class MessageCumulativeAckTest { eventLoopGroup = new NioEventLoopGroup(); brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup); - doReturn(brokerService).when(pulsar).getBrokerService(); + PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { + doReturn(brokerService).when(pulsar).getBrokerService(); + }); String topicName = TopicName.get("MessageCumulativeAckTest").toString(); PersistentTopic persistentTopic = new PersistentTopic(topicName, mock(ManagedLedger.class), brokerService); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 177aaefeb9d..31e6f5579b0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -64,6 +64,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.PulsarServiceMockSupport; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.PulsarResources; @@ -120,24 +121,32 @@ public class PersistentDispatcherFailoverConsumerTest { svcConfig.setBrokerShutdownTimeoutMs(0L); svcConfig.setClusterName("pulsar-cluster"); pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); - doReturn(svcConfig).when(pulsar).getConfiguration(); + store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build()); + doReturn(store).when(pulsar).getLocalMetadataStore(); + doReturn(store).when(pulsar).getConfigurationMetadataStore(); + + PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { + doReturn(svcConfig).when(pulsar).getConfiguration(); + }); mlFactoryMock = mock(ManagedLedgerFactory.class); - doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); + PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { + doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); + }); doReturn(TransactionTestBase.createMockBookKeeper(executor)) .when(pulsar).getBookKeeperClient(); eventLoopGroup = new NioEventLoopGroup(); - store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build()); - doReturn(store).when(pulsar).getLocalMetadataStore(); - doReturn(store).when(pulsar).getConfigurationMetadataStore(); - PulsarResources pulsarResources = new PulsarResources(store, store); - doReturn(pulsarResources).when(pulsar).getPulsarResources(); + PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { + doReturn(pulsarResources).when(pulsar).getPulsarResources(); + }); brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup); - doReturn(brokerService).when(pulsar).getBrokerService(); + PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { + doReturn(brokerService).when(pulsar).getBrokerService(); + }); consumerChanges = new LinkedBlockingQueue<>(); this.channelCtx = mock(ChannelHandlerContext.class); @@ -183,7 +192,9 @@ public class PersistentDispatcherFailoverConsumerTest { .when(serverCnxWithOldVersion).getCommandSender(); NamespaceService nsSvc = mock(NamespaceService.class); - doReturn(nsSvc).when(pulsar).getNamespaceService(); + PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { + doReturn(nsSvc).when(pulsar).getNamespaceService(); + }); doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class)); doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class)); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index ae67646ea3a..24b056d3a56 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -98,6 +98,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.PulsarServiceMockSupport; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.NamespaceResources; @@ -192,27 +193,29 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { deleteLedgerCallback.deleteLedgerComplete(null); return null; }).when(mlFactoryMock).asyncDelete(any(), any(), any()); - + // Mock metaStore. ZooKeeper mockZk = createMockZooKeeper(); doReturn(createMockBookKeeper(executor)) .when(pulsar).getBookKeeperClient(); - doReturn(executor).when(pulsar).getOrderedExecutor(); - store = new ZKMetadataStore(mockZk); + doReturn(store).when(pulsar).getLocalMetadataStore(); + doReturn(store).when(pulsar).getConfigurationMetadataStore(); + // Mock pulsarResources. PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store); NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30); TopicResources tsr = spyWithClassAndConstructorArgs(TopicResources.class, store); doReturn(nsr).when(pulsarResources).getNamespaceResources(); doReturn(tsr).when(pulsarResources).getTopicResources(); - doReturn(pulsarResources).when(pulsar).getPulsarResources(); - - doReturn(store).when(pulsar).getLocalMetadataStore(); - doReturn(store).when(pulsar).getConfigurationMetadataStore(); - + PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { + doReturn(pulsarResources).when(pulsar).getPulsarResources(); + }); + // Mock brokerService. brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup); - doReturn(brokerService).when(pulsar).getBrokerService(); - + PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { + doReturn(brokerService).when(pulsar).getBrokerService(); + }); + // Mock serviceCnx. serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar); doReturn(true).when(serverCnx).isActive(); doReturn(true).when(serverCnx).isWritable(); @@ -228,7 +231,9 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { NamespaceService nsSvc = mock(NamespaceService.class); NamespaceBundle bundle = mock(NamespaceBundle.class); doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any()); - doReturn(nsSvc).when(pulsar).getNamespaceService(); + PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { + doReturn(nsSvc).when(pulsar).getNamespaceService(); + }); doReturn(true).when(nsSvc).isServiceUnitOwned(any()); doReturn(true).when(nsSvc).isServiceUnitActive(any()); doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any()); @@ -2311,7 +2316,9 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { PulsarResources pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store); NamespaceResources nsr = spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30); doReturn(nsr).when(pulsarResources).getNamespaceResources(); - doReturn(pulsarResources).when(pulsar).getPulsarResources(); + PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { + doReturn(pulsarResources).when(pulsar).getPulsarResources(); + }); CompletableFuture<Optional<Policies>> policiesFuture = new CompletableFuture<>(); Policies policies = new Policies(); Set<String> namespaceClusters = new HashSet<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java index af539891711..6d108ce675d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java @@ -53,6 +53,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.PulsarServiceMockSupport; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; @@ -116,7 +117,10 @@ public class ServerCnxAuthorizationTest { doReturn(new DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService(); doReturn(svcConfig).when(pulsar).getConfiguration(); - doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources(); + PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { + doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources(); + }); + ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class); doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory(); @@ -132,7 +136,9 @@ public class ServerCnxAuthorizationTest { doReturn(store).when(pulsar).getConfigurationMetadataStore(); pulsarResources = spyWithClassAndConstructorArgs(PulsarResources.class, store, store); - doReturn(pulsarResources).when(pulsar).getPulsarResources(); + PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { + doReturn(pulsarResources).when(pulsar).getPulsarResources(); + }); NamespaceResources namespaceResources = spyWithClassAndConstructorArgs(NamespaceResources.class, store, store, 30); doReturn(namespaceResources).when(pulsarResources).getNamespaceResources(); @@ -146,8 +152,10 @@ public class ServerCnxAuthorizationTest { brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup); BrokerInterceptor interceptor = mock(BrokerInterceptor.class); doReturn(interceptor).when(brokerService).getInterceptor(); - doReturn(brokerService).when(pulsar).getBrokerService(); - doReturn(executor).when(pulsar).getOrderedExecutor(); + PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> { + doReturn(brokerService).when(pulsar).getBrokerService(); + doReturn(executor).when(pulsar).getOrderedExecutor(); + }); } @Test diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index cac386405a6..a116ea9c294 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -316,7 +316,8 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co /** * Run the task in the executor thread and fail the future if the executor is shutting down. */ - protected void execute(Runnable task, CompletableFuture<?> future) { + @VisibleForTesting + public void execute(Runnable task, CompletableFuture<?> future) { try { executor.execute(task); } catch (Throwable t) {
