This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a9d4ac85d8 [improve][test] Support decorating topic, subscription, 
dispatcher, ManagedLedger and ManagedCursors instances in tests (#23892)
2a9d4ac85d8 is described below

commit 2a9d4ac85d8d786979afaa0b965cdb27375ae969
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Jan 25 10:59:01 2025 +0200

    [improve][test] Support decorating topic, subscription, dispatcher, 
ManagedLedger and ManagedCursors instances in tests (#23892)
---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  15 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  10 +-
 .../pulsar/broker/ManagedLedgerClientFactory.java  |  14 +-
 .../apache/pulsar/broker/service/TopicFactory.java |   5 +
 .../service/persistent/PersistentSubscription.java | 139 ++++++++------
 .../broker/service/persistent/PersistentTopic.java |  12 +-
 .../testinterceptor/BrokerTestInterceptor.java     | 212 +++++++++++++++++++++
 7 files changed, 333 insertions(+), 74 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index f546a487f84..12c3ea12df5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -410,11 +410,8 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
                             new 
EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
                                     
config.getBookKeeperEnsemblePlacementPolicyProperties()))
                     .thenAccept(bk -> {
-                        final ManagedLedgerImpl newledger = 
config.getShadowSource() == null
-                                ? new ManagedLedgerImpl(this, bk, store, 
config, scheduledExecutor, name,
-                                mlOwnershipChecker)
-                                : new ShadowManagedLedgerImpl(this, bk, store, 
config, scheduledExecutor, name,
-                                mlOwnershipChecker);
+                        final ManagedLedgerImpl newledger =
+                                createManagedLedger(bk, store, name, config, 
mlOwnershipChecker);
                         PendingInitializeManagedLedger pendingLedger = new 
PendingInitializeManagedLedger(newledger);
                         pendingInitializeLedgers.put(name, pendingLedger);
                         newledger.initialize(new 
ManagedLedgerInitializeLedgerCallback() {
@@ -472,6 +469,14 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
         });
     }
 
+    protected ManagedLedgerImpl createManagedLedger(BookKeeper bk, MetaStore 
store, String name,
+                                                    ManagedLedgerConfig config,
+                                                    
Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
+        return config.getShadowSource() == null
+                ? new ManagedLedgerImpl(this, bk, store, config, 
scheduledExecutor, name, mlOwnershipChecker) :
+                new ShadowManagedLedgerImpl(this, bk, store, config, 
scheduledExecutor, name, mlOwnershipChecker);
+    }
+
     @Override
     public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
                               AsyncCallbacks.OpenReadOnlyManagedLedgerCallback 
callback,
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 966aa068f2f..4f45fc67b63 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -632,7 +632,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                     for (final String cursorName : consumers) {
                         log.info("[{}] Loading cursor {}", name, cursorName);
                         final ManagedCursorImpl cursor;
-                        cursor = new ManagedCursorImpl(bookKeeper, 
ManagedLedgerImpl.this, cursorName);
+                        cursor = 
createCursor(ManagedLedgerImpl.this.bookKeeper, cursorName);
 
                         cursor.recover(new VoidCallback() {
                             @Override
@@ -663,7 +663,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                             log.debug("[{}] Recovering cursor {} lazily", 
name, cursorName);
                         }
                         final ManagedCursorImpl cursor;
-                        cursor = new ManagedCursorImpl(bookKeeper, 
ManagedLedgerImpl.this, cursorName);
+                        cursor = 
createCursor(ManagedLedgerImpl.this.bookKeeper, cursorName);
                         CompletableFuture<ManagedCursor> cursorRecoveryFuture 
= new CompletableFuture<>();
                         uninitializedCursors.put(cursorName, 
cursorRecoveryFuture);
 
@@ -1007,7 +1007,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Creating new cursor: {}", name, cursorName);
         }
-        final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, 
this, cursorName);
+        final ManagedCursorImpl cursor = createCursor(bookKeeper, cursorName);
         CompletableFuture<ManagedCursor> cursorFuture = new 
CompletableFuture<>();
         uninitializedCursors.put(cursorName, cursorFuture);
         Position position = InitialPosition.Earliest == initialPosition ? 
getFirstPosition() : getLastPosition();
@@ -1039,6 +1039,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         });
     }
 
+    protected ManagedCursorImpl createCursor(BookKeeper bookKeeper, String 
cursorName) {
+        return new ManagedCursorImpl(bookKeeper, this, cursorName);
+    }
+
     @Override
     public synchronized void asyncDeleteCursor(final String consumerName, 
final DeleteCursorCallback callback,
             final Object ctx) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 737bc69bf24..3d945afe4c1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -116,8 +116,8 @@ public class ManagedLedgerClientFactory implements 
ManagedLedgerStorage {
 
         try {
             this.managedLedgerFactory =
-                    new ManagedLedgerFactoryImpl(metadataStore, bkFactory, 
managedLedgerFactoryConfig, statsLogger,
-                            openTelemetry);
+                    createManagedLedgerFactory(metadataStore, openTelemetry, 
bkFactory, managedLedgerFactoryConfig,
+                            statsLogger);
         } catch (Exception e) {
             statsProvider.stop();
             defaultBkClient.close();
@@ -147,6 +147,16 @@ public class ManagedLedgerClientFactory implements 
ManagedLedgerStorage {
         };
     }
 
+    protected ManagedLedgerFactoryImpl 
createManagedLedgerFactory(MetadataStoreExtended metadataStore,
+                                                                  
OpenTelemetry openTelemetry,
+                                                                  
BookkeeperFactoryForCustomEnsemblePlacementPolicy
+                                                                          
bkFactory,
+                                                                  
ManagedLedgerFactoryConfig managedLedgerFactoryConfig,
+                                                                  StatsLogger 
statsLogger) throws Exception {
+        return new ManagedLedgerFactoryImpl(metadataStore, bkFactory, 
managedLedgerFactoryConfig, statsLogger,
+                openTelemetry);
+    }
+
     @Override
     public Collection<ManagedLedgerStorageClass> getStorageClasses() {
         return List.of(getDefaultStorageClass());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
index f8bac9d8134..523f995cc5d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicFactory.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import java.io.Closeable;
+import java.io.IOException;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 
 /**
@@ -28,4 +29,8 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 public interface TopicFactory extends Closeable {
 
     <T extends Topic> T create(String topic, ManagedLedger ledger, 
BrokerService brokerService, Class<T> topicClazz);
+
+    default void close() throws IOException {
+        // default implementation
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a96a7e75506..275d1ae5818 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -250,70 +250,10 @@ public class PersistentSubscription extends 
AbstractSubscription {
                 }
 
                 if (dispatcher == null || !dispatcher.isConsumerConnected()) {
-                    Dispatcher previousDispatcher = null;
-                    switch (consumer.subType()) {
-                        case Exclusive:
-                            if (dispatcher == null || dispatcher.getType() != 
SubType.Exclusive) {
-                                previousDispatcher = dispatcher;
-                                dispatcher = new 
PersistentDispatcherSingleActiveConsumer(
-                                        cursor, SubType.Exclusive, 0, topic, 
this);
-                            }
-                            break;
-                        case Shared:
-                            if (dispatcher == null || dispatcher.getType() != 
SubType.Shared) {
-                                previousDispatcher = dispatcher;
-                                if 
(config.isSubscriptionSharedUseClassicPersistentImplementation()) {
-                                    dispatcher = new 
PersistentDispatcherMultipleConsumersClassic(topic, cursor, this);
-                                } else {
-                                    dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursor, this);
-                                }
-                            }
-                            break;
-                        case Failover:
-                            int partitionIndex = 
TopicName.getPartitionIndex(topicName);
-                            if (partitionIndex < 0) {
-                                // For non partition topics, use a negative 
index so
-                                // dispatcher won't sort consumers before 
picking
-                                // an active consumer for the topic.
-                                partitionIndex = -1;
-                            }
-
-                            if (dispatcher == null || dispatcher.getType() != 
SubType.Failover) {
-                                previousDispatcher = dispatcher;
-                                dispatcher = new 
PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
-                                                partitionIndex, topic, this);
-                            }
-                            break;
-                        case Key_Shared:
-                            KeySharedMeta ksm = consumer.getKeySharedMeta();
-                            if (dispatcher == null || dispatcher.getType() != 
SubType.Key_Shared
-                                    || !((StickyKeyDispatcher) dispatcher)
-                                    .hasSameKeySharedPolicy(ksm)) {
-                                previousDispatcher = dispatcher;
-                                if 
(config.isSubscriptionKeySharedUseClassicPersistentImplementation()) {
-                                    dispatcher =
-                                            new 
PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor,
-                                                    this,
-                                                    
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
-                                } else {
-                                    dispatcher = new 
PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
-                                            
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
-                                }
-                            }
-                            break;
-                        default:
-                            return FutureUtil.failedFuture(
-                                    new ServerMetadataException("Unsupported 
subscription type"));
-                    }
-
-                    if (previousDispatcher != null) {
-                        previousDispatcher.close().thenRun(() -> {
-                            log.info("[{}][{}] Successfully closed previous 
dispatcher", topicName, subName);
-                        }).exceptionally(ex -> {
-                            log.error("[{}][{}] Failed to close previous 
dispatcher", topicName, subName, ex);
-                            return null;
-                        });
+                    if (consumer.subType() == null) {
+                        return FutureUtil.failedFuture(new 
ServerMetadataException("Unsupported subscription type"));
                     }
+                    dispatcher = reuseOrCreateDispatcher(dispatcher, consumer);
                 } else {
                     Optional<CompletableFuture<Void>> compatibilityError =
                             
checkForConsumerCompatibilityErrorWithDispatcher(dispatcher, consumer);
@@ -327,6 +267,79 @@ public class PersistentSubscription extends 
AbstractSubscription {
         });
     }
 
+    /**
+     * Create a new dispatcher or reuse the existing one when it's compatible 
with the new consumer.
+     * This protected method can be overridded for testing purpose for 
injecting test dispatcher instances with
+     * special behaviors.
+     * @param dispatcher the existing dispatcher
+     * @param consumer the new consumer
+     * @return the dispatcher to use, either the existing one or a new one
+     */
+    protected Dispatcher reuseOrCreateDispatcher(Dispatcher dispatcher, 
Consumer consumer) {
+        Dispatcher previousDispatcher = null;
+        switch (consumer.subType()) {
+            case Exclusive:
+                if (dispatcher == null || dispatcher.getType() != 
SubType.Exclusive) {
+                    previousDispatcher = dispatcher;
+                    dispatcher = new PersistentDispatcherSingleActiveConsumer(
+                            cursor, SubType.Exclusive, 0, topic, this);
+                }
+                break;
+            case Shared:
+                if (dispatcher == null || dispatcher.getType() != 
SubType.Shared) {
+                    previousDispatcher = dispatcher;
+                    if 
(config.isSubscriptionSharedUseClassicPersistentImplementation()) {
+                        dispatcher = new 
PersistentDispatcherMultipleConsumersClassic(topic, cursor, this);
+                    } else {
+                        dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursor, this);
+                    }
+                }
+                break;
+            case Failover:
+                int partitionIndex = TopicName.getPartitionIndex(topicName);
+                if (partitionIndex < 0) {
+                    // For non partition topics, use a negative index so
+                    // dispatcher won't sort consumers before picking
+                    // an active consumer for the topic.
+                    partitionIndex = -1;
+                }
+
+                if (dispatcher == null || dispatcher.getType() != 
SubType.Failover) {
+                    previousDispatcher = dispatcher;
+                    dispatcher = new 
PersistentDispatcherSingleActiveConsumer(cursor, SubType.Failover,
+                            partitionIndex, topic, this);
+                }
+                break;
+            case Key_Shared:
+                KeySharedMeta ksm = consumer.getKeySharedMeta();
+                if (dispatcher == null || dispatcher.getType() != 
SubType.Key_Shared
+                        || !((StickyKeyDispatcher) dispatcher)
+                        .hasSameKeySharedPolicy(ksm)) {
+                    previousDispatcher = dispatcher;
+                    if 
(config.isSubscriptionKeySharedUseClassicPersistentImplementation()) {
+                        dispatcher =
+                                new 
PersistentStickyKeyDispatcherMultipleConsumersClassic(topic, cursor,
+                                        this, config, ksm);
+                    } else {
+                        dispatcher = new 
PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
+                                config, ksm);
+                    }
+                }
+                break;
+        }
+
+        if (previousDispatcher != null) {
+            previousDispatcher.close().thenRun(() -> {
+                log.info("[{}][{}] Successfully closed previous dispatcher", 
topicName, subName);
+            }).exceptionally(ex -> {
+                log.error("[{}][{}] Failed to close previous dispatcher", 
topicName, subName, ex);
+                return null;
+            });
+        }
+
+        return dispatcher;
+    }
+
     @Override
     public synchronized void removeConsumer(Consumer consumer, boolean 
isResetCursor) throws BrokerServiceException {
         cursor.updateLastActive();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2325c8286a1..e920c483bb3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -586,7 +586,17 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         });
     }
 
-    private PersistentSubscription createPersistentSubscription(String 
subscriptionName, ManagedCursor cursor,
+
+    /**
+     * Create a new subscription instance for the topic.
+     * This protected method can be overridden in tests to return a special 
test implementation instance.
+     * @param subscriptionName the name of the subscription
+     * @param cursor the cursor to use for the subscription
+     * @param replicated the subscription replication flag
+     * @param subscriptionProperties the subscription properties
+     * @return the subscription instance
+     */
+    protected PersistentSubscription createPersistentSubscription(String 
subscriptionName, ManagedCursor cursor,
             Boolean replicated, Map<String, String> subscriptionProperties) {
         requireNonNull(topicCompactionService);
         if (isCompactionSubscription(subscriptionName)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testinterceptor/BrokerTestInterceptor.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testinterceptor/BrokerTestInterceptor.java
new file mode 100644
index 00000000000..a1549b2cb86
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testinterceptor/BrokerTestInterceptor.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testinterceptor;
+
+import io.opentelemetry.api.OpenTelemetry;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.MetaStore;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ManagedLedgerClientFactory;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.TopicFactory;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+/**
+ * A test interceptor for broker tests that allows to decorate persistent 
topics, subscriptions, dispatchers
+ * managed ledger factory, managed ledger and managed cursor instances.
+ */
+public class BrokerTestInterceptor {
+    public static final BrokerTestInterceptor INSTANCE = new 
BrokerTestInterceptor();
+
+    // Suppress default constructor for noninstantiability
+    private BrokerTestInterceptor() {
+
+    }
+
+    public static class TestTopicFactory implements TopicFactory {
+        @Override
+        public <T extends Topic> T create(String topic, ManagedLedger ledger, 
BrokerService brokerService,
+                                          Class<T> topicClazz) {
+            if (!topicClazz.isAssignableFrom(PersistentTopic.class)) {
+                throw new UnsupportedOperationException("Unsupported topic 
class");
+            }
+            return topicClazz.cast(
+                    INSTANCE.getPersistentTopicDecorator().apply(new 
TestTopic(topic, ledger, brokerService)));
+        }
+    }
+
+    static class TestTopic extends PersistentTopic {
+
+        public TestTopic(String topic, ManagedLedger ledger, BrokerService 
brokerService) {
+            super(topic, ledger, brokerService);
+        }
+
+        @Override
+        protected PersistentSubscription createPersistentSubscription(String 
subscriptionName, ManagedCursor cursor,
+                                                                      Boolean 
replicated,
+                                                                      
Map<String, String> subscriptionProperties) {
+            return INSTANCE.getPersistentSubscriptionDecorator()
+                    .apply(new TestSubscription(this, subscriptionName, 
cursor, replicated, subscriptionProperties));
+        }
+    }
+
+    static class TestSubscription extends PersistentSubscription {
+        public TestSubscription(PersistentTopic topic, String 
subscriptionName, ManagedCursor cursor,
+                                Boolean replicated,
+                                Map<String, String> subscriptionProperties) {
+            super(topic, subscriptionName, cursor, replicated, 
subscriptionProperties);
+        }
+
+        @Override
+        protected Dispatcher reuseOrCreateDispatcher(Dispatcher dispatcher,
+                                                     Consumer consumer) {
+            Dispatcher previousInstance = dispatcher;
+            dispatcher = super.reuseOrCreateDispatcher(dispatcher, consumer);
+            if (dispatcher != previousInstance) {
+                dispatcher = 
INSTANCE.getDispatcherDecorator().apply(dispatcher);
+            }
+            return dispatcher;
+        }
+    }
+
+    public static class TestManagedLedgerStorage extends 
ManagedLedgerClientFactory {
+        @Override
+        protected ManagedLedgerFactoryImpl 
createManagedLedgerFactory(MetadataStoreExtended metadataStore,
+                                                                      
OpenTelemetry openTelemetry,
+                                                                      
ManagedLedgerFactoryImpl.BookkeeperFactoryForCustomEnsemblePlacementPolicy 
bkFactory,
+                                                                      
ManagedLedgerFactoryConfig managedLedgerFactoryConfig,
+                                                                      
StatsLogger statsLogger) throws Exception {
+            return INSTANCE.managedLedgerFactoryDecorator.apply(
+                    new TestManagedLedgerFactoryImpl(metadataStore, bkFactory, 
managedLedgerFactoryConfig, statsLogger,
+                            openTelemetry));
+        }
+    }
+
+    static class TestManagedLedgerFactoryImpl extends ManagedLedgerFactoryImpl 
{
+        public TestManagedLedgerFactoryImpl(MetadataStoreExtended 
metadataStore,
+                                            
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
+                                            ManagedLedgerFactoryConfig config, 
StatsLogger statsLogger,
+                                            OpenTelemetry openTelemetry) 
throws Exception {
+            super(metadataStore, bookKeeperGroupFactory, config, statsLogger, 
openTelemetry);
+        }
+
+        @Override
+        protected ManagedLedgerImpl createManagedLedger(BookKeeper bk, 
MetaStore store, String name,
+                                                        ManagedLedgerConfig 
config,
+                                                        
Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
+            return INSTANCE.managedLedgerDecorator.apply(
+                    new TestManagedLedgerImpl(this, bk, store, config, 
scheduledExecutor, name, mlOwnershipChecker));
+        }
+    }
+
+    static class TestManagedLedgerImpl extends ManagedLedgerImpl {
+        public TestManagedLedgerImpl(ManagedLedgerFactoryImpl factory, 
BookKeeper bookKeeper, MetaStore store,
+                                     ManagedLedgerConfig config,
+                                     OrderedScheduler scheduledExecutor, 
String name,
+                                     Supplier<CompletableFuture<Boolean>> 
mlOwnershipChecker) {
+            super(factory, bookKeeper, store, config, scheduledExecutor, name, 
mlOwnershipChecker);
+        }
+
+        @Override
+        protected ManagedCursorImpl createCursor(BookKeeper bookKeeper, String 
cursorName) {
+            return 
INSTANCE.managedCursorDecorator.apply(super.createCursor(bookKeeper, 
cursorName));
+        }
+    }
+
+    @Getter
+    @Setter
+    private Function<PersistentTopic, PersistentTopic> 
persistentTopicDecorator = Function.identity();
+
+    @Getter
+    @Setter
+    private Function<PersistentSubscription, PersistentSubscription> 
persistentSubscriptionDecorator = Function.identity();
+
+    @Getter
+    @Setter
+    private Function<Dispatcher, Dispatcher> dispatcherDecorator = 
Function.identity();
+
+    @Getter
+    @Setter
+    private Function<ManagedLedgerFactoryImpl, ManagedLedgerFactoryImpl> 
managedLedgerFactoryDecorator = Function.identity();
+
+    @Getter
+    @Setter
+    private Function<ManagedLedgerImpl, ManagedLedgerImpl> 
managedLedgerDecorator = Function.identity();
+
+    @Getter
+    @Setter
+    private Function<ManagedCursorImpl, ManagedCursorImpl> 
managedCursorDecorator = Function.identity();
+
+    public void reset() {
+        persistentTopicDecorator = Function.identity();
+        persistentSubscriptionDecorator = Function.identity();
+        dispatcherDecorator = Function.identity();
+        managedLedgerFactoryDecorator = Function.identity();
+        managedLedgerDecorator = Function.identity();
+        managedCursorDecorator = Function.identity();
+    }
+
+    public void configure(ServiceConfiguration conf) {
+        conf.setTopicFactoryClassName(TestTopicFactory.class.getName());
+        
conf.setManagedLedgerStorageClassName(TestManagedLedgerStorage.class.getName());
+    }
+
+    public  <T extends Dispatcher> void applyDispatcherSpyDecorator(Class<T> 
dispatcherClass,
+                                                                    
java.util.function.Consumer<T> spyCustomizer) {
+        setDispatcherDecorator(createDispatcherSpyDecorator(dispatcherClass, 
spyCustomizer));
+    }
+
+    public static <T extends Dispatcher> Function<Dispatcher, Dispatcher> 
createDispatcherSpyDecorator(
+            Class<T> dispatcherClass, java.util.function.Consumer<T> 
spyCustomizer) {
+        return dispatcher -> {
+            Dispatcher spy = 
BrokerTestUtil.spyWithoutRecordingInvocations(dispatcher);
+            spyCustomizer.accept(dispatcherClass.cast(spy));
+            return spy;
+        };
+    }
+
+    public void 
applyCursorSpyDecorator(java.util.function.Consumer<ManagedCursorImpl> 
spyCustomizer) {
+        setManagedCursorDecorator(cursor -> {
+            ManagedCursorImpl spy = 
BrokerTestUtil.spyWithoutRecordingInvocations(cursor);
+            spyCustomizer.accept(spy);
+            return spy;
+        });
+    }
+}

Reply via email to