This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 6df02655a3c [fix][broker][branch-3.0] Set ServiceUnitStateChannel 
topic compaction threshold explicitly, improve getOwnerAsync, and fix other 
bugs (#22064) (#22154)
6df02655a3c is described below

commit 6df02655a3c307d29a86b7f3e0ea46c16ad18aea
Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com>
AuthorDate: Wed Feb 28 17:52:37 2024 -0800

    [fix][broker][branch-3.0] Set ServiceUnitStateChannel topic compaction 
threshold explicitly, improve getOwnerAsync, and fix other bugs (#22064) 
(#22154)
---
 .../extensions/ExtensibleLoadManagerImpl.java      |  56 +++++++--
 .../channel/ServiceUnitStateChannelImpl.java       | 101 +++++++++++----
 .../extensions/store/LoadDataStoreFactory.java     |   7 +-
 .../store/TableViewLoadDataStoreImpl.java          |  79 +++++++++---
 .../client/impl/RawBatchMessageContainerImpl.java  |   1 +
 .../extensions/ExtensibleLoadManagerImplTest.java  | 136 ++++++++++++---------
 .../channel/ServiceUnitStateChannelTest.java       |  77 ++++++------
 .../extensions/store/LoadDataStoreTest.java        |  41 ++-----
 .../apache/pulsar/client/impl/TableViewImpl.java   |   8 +-
 .../loadbalance/ExtensibleLoadManagerTest.java     |   1 -
 10 files changed, 325 insertions(+), 182 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index baf3e7b1563..409bb55075b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.loadbalance.extensions;
 import static java.lang.String.format;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.TOPIC;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
 import static 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.getNamespaceBundle;
@@ -117,6 +118,8 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
 
     private static final long MONITOR_INTERVAL_IN_MILLIS = 120_000;
 
+    public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024;
+
     private static final String ELECTION_ROOT = 
"/loadbalance/extension/leader";
 
     private PulsarService pulsar;
@@ -173,6 +176,8 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
 
     private volatile boolean started = false;
 
+    private boolean configuredSystemTopics = false;
+
     private final AssignCounter assignCounter = new AssignCounter();
     @Getter
     private final UnloadCounter unloadCounter = new UnloadCounter();
@@ -262,6 +267,10 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         return 
ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
     }
 
+    public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
+        return pulsar.getLoadManager().get() instanceof 
ExtensibleLoadManagerWrapper;
+    }
+
     public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
         if (!(loadManager instanceof ExtensibleLoadManagerWrapper 
loadManagerWrapper)) {
             throw new IllegalArgumentException("The load manager should be 
'ExtensibleLoadManagerWrapper'.");
@@ -291,6 +300,27 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         createSystemTopic(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
     }
 
+    private static boolean configureSystemTopics(PulsarService pulsar) {
+        try {
+            if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
+                    && (pulsar.getConfiguration().isSystemTopicEnabled()
+                    && 
pulsar.getConfiguration().isTopicLevelPoliciesEnabled())) {
+                Long threshold = 
pulsar.getAdminClient().topicPolicies().getCompactionThreshold(TOPIC);
+                if (threshold == null || COMPACTION_THRESHOLD != 
threshold.longValue()) {
+                    
pulsar.getAdminClient().topicPolicies().setCompactionThreshold(TOPIC, 
COMPACTION_THRESHOLD);
+                    log.info("Set compaction threshold: {} bytes for system 
topic {}.", COMPACTION_THRESHOLD, TOPIC);
+                }
+            } else {
+                log.warn("System topic or topic level policies is disabled. "
+                        + "{} compaction threshold follows the broker or 
namespace policies.", TOPIC);
+            }
+            return true;
+        } catch (Exception e) {
+            log.error("Failed to set compaction threshold for system 
topic:{}", TOPIC, e);
+        }
+        return false;
+    }
+
     @Override
     public void start() throws PulsarServerException {
         if (this.started) {
@@ -329,9 +359,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
 
             try {
                 this.brokerLoadDataStore = LoadDataStoreFactory
-                        .create(pulsar.getClient(), 
BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class);
+                        .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, 
BrokerLoadData.class);
                 this.topBundlesLoadDataStore = LoadDataStoreFactory
-                        .create(pulsar.getClient(), 
TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
+                        .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, 
TopBundlesLoadData.class);
             } catch (LoadDataStoreException e) {
                 throw new PulsarServerException(e);
             }
@@ -389,6 +419,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
             this.splitScheduler.start();
             this.initWaiter.countDown();
             this.started = true;
+            log.info("Started load manager.");
         } catch (Exception ex) {
             log.error("Failed to start the extensible load balance and close 
broker registry {}.",
                     this.brokerRegistry, ex);
@@ -523,7 +554,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                     if (ex != null) {
                         assignCounter.incrementFailure();
                     }
-                    lookupRequests.remove(key, newFutureCreated.getValue());
+                    lookupRequests.remove(key);
                 });
             }
         }
@@ -736,13 +767,13 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
     }
 
     public static boolean isInternalTopic(String topic) {
-        return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC)
+        return topic.startsWith(TOPIC)
                 || topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
                 || topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
     }
 
     @VisibleForTesting
-    void playLeader() {
+    synchronized void playLeader() {
         log.info("This broker:{} is setting the role from {} to {}",
                 pulsar.getBrokerId(), role, Leader);
         int retry = 0;
@@ -760,7 +791,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                 serviceUnitStateChannel.scheduleOwnershipMonitor();
                 break;
             } catch (Throwable e) {
-                log.error("The broker:{} failed to set the role. Retrying {} 
th ...",
+                log.warn("The broker:{} failed to set the role. Retrying {} th 
...",
                         pulsar.getBrokerId(), ++retry, e);
                 try {
                     Thread.sleep(Math.min(retry * 10, 
MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
@@ -780,7 +811,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
     }
 
     @VisibleForTesting
-    void playFollower() {
+    synchronized void playFollower() {
         log.info("This broker:{} is setting the role from {} to {}",
                 pulsar.getBrokerId(), role, Follower);
         int retry = 0;
@@ -794,7 +825,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                 topBundlesLoadDataStore.startProducer();
                 break;
             } catch (Throwable e) {
-                log.error("The broker:{} failed to set the role. Retrying {} 
th ...",
+                log.warn("The broker:{} failed to set the role. Retrying {} th 
...",
                         pulsar.getBrokerId(), ++retry, e);
                 try {
                     Thread.sleep(Math.min(retry * 10, 
MAX_ROLE_CHANGE_RETRY_DELAY_IN_MILLIS));
@@ -834,7 +865,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         return metricsCollection;
     }
 
-    private void monitor() {
+
+    @VisibleForTesting
+    protected void monitor() {
         try {
             initWaiter.await();
 
@@ -842,6 +875,11 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
             // Periodically check the role in case ZK watcher fails.
             var isChannelOwner = serviceUnitStateChannel.isChannelOwner();
             if (isChannelOwner) {
+                // System topic config might fail due to the race condition
+                // with topic policy init(Topic policies cache have not init).
+                if (!configuredSystemTopics) {
+                    configuredSystemTopics = configureSystemTopics(pulsar);
+                }
                 if (role != Leader) {
                     log.warn("Current role:{} does not match with the channel 
ownership:{}. "
                             + "Playing the leader role.", role, 
isChannelOwner);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 8efa4e2e21a..220ce02ba5a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -492,21 +492,28 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             String serviceUnit,
             ServiceUnitState state,
             Optional<String> owner) {
-        CompletableFuture<Optional<String>> activeOwner = owner.isPresent()
-                ? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData 
-> lookupData.flatMap(__ -> owner))
-                : CompletableFuture.completedFuture(Optional.empty());
-
-        return activeOwner
-                .thenCompose(broker -> broker
-                        .map(__ -> activeOwner)
-                        .orElseGet(() -> 
deferGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable)))
-                .whenComplete((__, e) -> {
+        return deferGetOwnerRequest(serviceUnit)
+                .thenCompose(newOwner -> {
+                    if (newOwner == null) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+
+                    return brokerRegistry.lookupAsync(newOwner)
+                            .thenApply(lookupData -> {
+                                if (lookupData.isPresent()) {
+                                    return newOwner;
+                                } else {
+                                    throw new IllegalStateException(
+                                            "The new owner " + newOwner + " is 
inactive.");
+                                }
+                            });
+                }).whenComplete((__, e) -> {
                     if (e != null) {
-                        log.error("Failed to get active owner broker. 
serviceUnit:{}, state:{}, owner:{}",
-                                serviceUnit, state, owner, e);
+                        log.error("{} failed to get active owner broker. 
serviceUnit:{}, state:{}, owner:{}",
+                                brokerId, serviceUnit, state, owner, e);
                         
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
                     }
-                });
+                }).thenApply(Optional::ofNullable);
     }
 
     public CompletableFuture<Optional<String>> getOwnerAsync(String 
serviceUnit) {
@@ -544,6 +551,25 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
     }
 
+    private Optional<String> getOwner(String serviceUnit) {
+        ServiceUnitStateData data = tableview.get(serviceUnit);
+        ServiceUnitState state = state(data);
+        switch (state) {
+            case Owned -> {
+                return Optional.of(data.dstBroker());
+            }
+            case Splitting -> {
+                return Optional.of(data.sourceBroker());
+            }
+            case Init, Free -> {
+                return Optional.empty();
+            }
+            default -> {
+                return null;
+            }
+        }
+    }
+
     private long getNextVersionId(String serviceUnit) {
         var data = tableview.get(serviceUnit);
         return getNextVersionId(data);
@@ -697,7 +723,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
     private void log(Throwable e, String serviceUnit, ServiceUnitStateData 
data, ServiceUnitStateData next) {
         if (e == null) {
-            if (log.isDebugEnabled() || isTransferCommand(data)) {
+            if (debug() || isTransferCommand(data)) {
                 long handlerTotalCount = getHandlerTotalCounter(data).get();
                 long handlerFailureCount = 
getHandlerFailureCounter(data).get();
                 log.info("{} handled {} event for serviceUnit:{}, cur:{}, 
next:{}, "
@@ -736,6 +762,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) 
{
         var getOwnerRequest = getOwnerRequests.remove(serviceUnit);
         if (getOwnerRequest != null) {
+            if (debug()) {
+                log.info("Returned owner request for serviceUnit:{}", 
serviceUnit);
+            }
             getOwnerRequest.complete(data.dstBroker());
         }
         stateChangeListeners.notify(serviceUnit, data, null);
@@ -848,26 +877,52 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) 
{
+
         var requested = new MutableObject<CompletableFuture<String>>();
         try {
             return getOwnerRequests
                     .computeIfAbsent(serviceUnit, k -> {
-                        CompletableFuture<String> future = new 
CompletableFuture<>();
+                        var ownerBefore = getOwner(serviceUnit);
+                        if (ownerBefore != null && ownerBefore.isPresent()) {
+                            // Here, we do a quick active check first with the 
computeIfAbsent lock
+                            
brokerRegistry.lookupAsync(ownerBefore.get()).getNow(Optional.empty())
+                                    .ifPresent(__ -> requested.setValue(
+                                            
CompletableFuture.completedFuture(ownerBefore.get())));
+
+                            if (requested.getValue() != null) {
+                                return requested.getValue();
+                            }
+                        }
+
+
+                        CompletableFuture<String> future =
+                                new 
CompletableFuture<String>().orTimeout(inFlightStateWaitingTimeInMillis,
+                                                TimeUnit.MILLISECONDS)
+                                        .exceptionally(e -> {
+                                            var ownerAfter = 
getOwner(serviceUnit);
+                                            log.warn("{} failed to wait for 
owner for serviceUnit:{}; Trying to "
+                                                            + "return the 
current owner:{}",
+                                                    brokerId, serviceUnit, 
ownerAfter, e);
+                                            if (ownerAfter == null) {
+                                                throw new 
IllegalStateException(e);
+                                            }
+                                            return ownerAfter.orElse(null);
+                                        });
+                        if (debug()) {
+                            log.info("{} is waiting for owner for 
serviceUnit:{}", brokerId, serviceUnit);
+                        }
                         requested.setValue(future);
                         return future;
                     });
         } finally {
             var future = requested.getValue();
             if (future != null) {
-                future.orTimeout(inFlightStateWaitingTimeInMillis + 5 * 1000, 
TimeUnit.MILLISECONDS)
-                        .whenComplete((v, e) -> {
-                                    if (e != null) {
-                                        getOwnerRequests.remove(serviceUnit, 
future);
-                                        log.warn("Failed to getOwner for 
serviceUnit:{}",
-                                                serviceUnit, e);
-                                    }
-                                }
-                        );
+                future.whenComplete((__, e) -> {
+                    getOwnerRequests.remove(serviceUnit);
+                    if (e != null) {
+                        log.warn("{} failed to getOwner for serviceUnit:{}", 
brokerId, serviceUnit, e);
+                    }
+                });
             }
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java
index 18f39abd76b..bcb2657c67f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreFactory.java
@@ -18,15 +18,16 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.store;
 
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.broker.PulsarService;
 
 /**
  * The load data store factory, use to create the load data store.
  */
 public class LoadDataStoreFactory {
 
-    public static <T> LoadDataStore<T> create(PulsarClient client, String 
name, Class<T> clazz)
+    public static <T> LoadDataStore<T> create(PulsarService pulsar, String 
name,
+                                              Class<T> clazz)
             throws LoadDataStoreException {
-        return new TableViewLoadDataStoreImpl<>(client, name, clazz);
+        return new TableViewLoadDataStoreImpl<>(pulsar, name, clazz);
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
index 56afbef0456..d916e917162 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java
@@ -23,34 +23,46 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
-import org.apache.pulsar.common.util.FutureUtil;
 
 /**
  * The load data store, base on {@link TableView <T>}.
  *
  * @param <T> Load data type.
  */
+@Slf4j
 public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
 
+    private static final long 
LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2;
+
     private volatile TableView<T> tableView;
+    private volatile long tableViewLastUpdateTimestamp;
 
     private volatile Producer<T> producer;
 
+    private final ServiceConfiguration conf;
+
     private final PulsarClient client;
 
     private final String topic;
 
     private final Class<T> clazz;
 
-    public TableViewLoadDataStoreImpl(PulsarClient client, String topic, 
Class<T> clazz) throws LoadDataStoreException {
+    public TableViewLoadDataStoreImpl(PulsarService pulsar, String topic, 
Class<T> clazz)
+            throws LoadDataStoreException {
         try {
-            this.client = client;
+            this.conf = pulsar.getConfiguration();
+            this.client = pulsar.getClient();
             this.topic = topic;
             this.clazz = clazz;
         } catch (Exception e) {
@@ -60,40 +72,36 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
 
     @Override
     public synchronized CompletableFuture<Void> pushAsync(String key, T 
loadData) {
-        if (producer == null) {
-            return FutureUtil.failedFuture(new IllegalStateException("producer 
has not been started"));
-        }
+        validateProducer();
         return 
producer.newMessage().key(key).value(loadData).sendAsync().thenAccept(__ -> {});
     }
 
     @Override
     public synchronized CompletableFuture<Void> removeAsync(String key) {
-        if (producer == null) {
-            return FutureUtil.failedFuture(new IllegalStateException("producer 
has not been started"));
-        }
+        validateProducer();
         return 
producer.newMessage().key(key).value(null).sendAsync().thenAccept(__ -> {});
     }
 
     @Override
     public synchronized Optional<T> get(String key) {
-        validateTableViewStart();
+        validateTableView();
         return Optional.ofNullable(tableView.get(key));
     }
 
     @Override
     public synchronized void forEach(BiConsumer<String, T> action) {
-        validateTableViewStart();
+        validateTableView();
         tableView.forEach(action);
     }
 
     public synchronized Set<Map.Entry<String, T>> entrySet() {
-        validateTableViewStart();
+        validateTableView();
         return tableView.entrySet();
     }
 
     @Override
     public synchronized int size() {
-        validateTableViewStart();
+        validateTableView();
         return tableView.size();
     }
 
@@ -116,6 +124,8 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
         if (tableView == null) {
             try {
                 tableView = 
client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
+                tableView.forEachAndListen((k, v) ->
+                        tableViewLastUpdateTimestamp = 
System.currentTimeMillis());
             } catch (PulsarClientException e) {
                 tableView = null;
                 throw new LoadDataStoreException(e);
@@ -150,9 +160,48 @@ public class TableViewLoadDataStoreImpl<T> implements 
LoadDataStore<T> {
         start();
     }
 
-    private synchronized void validateTableViewStart() {
+    private void validateProducer() {
+        if (producer == null || !producer.isConnected()) {
+            try {
+                if (producer != null) {
+                    producer.close();
+                }
+                producer = null;
+                startProducer();
+                log.info("Restarted producer on {}", topic);
+            } catch (Exception e) {
+                log.error("Failed to restart producer on {}", topic, e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private void validateTableView() {
+        String restartReason = null;
+
         if (tableView == null) {
-            throw new IllegalStateException("table view has not been started");
+            restartReason = "table view is null";
+        } else {
+            long inactiveDuration = System.currentTimeMillis() - 
tableViewLastUpdateTimestamp;
+            long threshold = 
TimeUnit.MINUTES.toMillis(conf.getLoadBalancerReportUpdateMaxIntervalMinutes())
+                    * 
LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART;
+            if (inactiveDuration > threshold) {
+                restartReason = String.format("inactiveDuration=%d secs > 
threshold = %d secs",
+                        TimeUnit.MILLISECONDS.toSeconds(inactiveDuration),
+                        TimeUnit.MILLISECONDS.toSeconds(threshold));
+            }
+        }
+
+        if (StringUtils.isNotBlank(restartReason)) {
+            tableViewLastUpdateTimestamp = 0;
+            try {
+                closeTableView();
+                startTableView();
+                log.info("Restarted tableview on {}, {}", topic, 
restartReason);
+            } catch (Exception e) {
+                log.error("Failed to restart tableview on {}", topic, e);
+                throw new RuntimeException(e);
+            }
         }
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
index ba8d3db7178..374f1e30c0a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java
@@ -187,6 +187,7 @@ public class RawBatchMessageContainerImpl extends 
BatchMessageContainerImpl {
         idData.writeTo(buf);
         buf.writeInt(metadataAndPayload.readableBytes());
         buf.writeBytes(metadataAndPayload);
+        metadataAndPayload.release();
         encryptedPayload.release();
         clear();
         return buf;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 4ec884a624e..e87532cdc6c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -115,6 +115,7 @@ import 
org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
 import org.awaitility.Awaitility;
 import org.mockito.MockedStatic;
+import org.testng.AssertJUnit;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -141,64 +142,53 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
 
     private final String defaultTestNamespace = "public/test";
 
+    private static void initConfig(ServiceConfiguration conf){
+        conf.setForceDeleteNamespaceAllowed(true);
+        conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+        conf.setAllowAutoTopicCreation(true);
+        
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+        
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+        conf.setLoadBalancerSheddingEnabled(false);
+        conf.setLoadBalancerDebugModeEnabled(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+    }
+
     @BeforeClass
     @Override
     public void setup() throws Exception {
-        try (MockedStatic<ServiceUnitStateChannelImpl> channelMockedStatic =
-                     mockStatic(ServiceUnitStateChannelImpl.class)) {
-            channelMockedStatic.when(() -> 
ServiceUnitStateChannelImpl.newInstance(isA(PulsarService.class)))
-                    .thenAnswer(invocation -> {
-                        PulsarService pulsarService = 
invocation.getArgument(0);
-                        // Set the inflight state waiting time and ownership 
monitor delay time to 5 seconds to avoid
-                        // stuck when doing unload.
-                        return new ServiceUnitStateChannelImpl(pulsarService, 
5 * 1000, 1);
-                    });
-            conf.setForceDeleteNamespaceAllowed(true);
-            conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
-            conf.setAllowAutoTopicCreation(true);
-            
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
-            
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
-            conf.setLoadBalancerSheddingEnabled(false);
-            conf.setLoadBalancerDebugModeEnabled(true);
-            conf.setTopicLevelPoliciesEnabled(true);
-            super.internalSetup(conf);
-            pulsar1 = pulsar;
-            ServiceConfiguration defaultConf = getDefaultConf();
-            defaultConf.setAllowAutoTopicCreation(true);
-            defaultConf.setForceDeleteNamespaceAllowed(true);
-            
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
-            
defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
-            defaultConf.setLoadBalancerSheddingEnabled(false);
-            defaultConf.setTopicLevelPoliciesEnabled(true);
-            additionalPulsarTestContext = 
createAdditionalPulsarTestContext(defaultConf);
-            pulsar2 = additionalPulsarTestContext.getPulsarService();
-
-            setPrimaryLoadManager();
-
-            setSecondaryLoadManager();
-
-            admin.clusters().createCluster(this.conf.getClusterName(),
-                    
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
-            admin.tenants().createTenant("public",
-                    new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
-                            Sets.newHashSet(this.conf.getClusterName())));
-            admin.namespaces().createNamespace("public/default");
-            
admin.namespaces().setNamespaceReplicationClusters("public/default",
-                    Sets.newHashSet(this.conf.getClusterName()));
-
-            admin.namespaces().createNamespace(defaultTestNamespace);
-            
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
-                    Sets.newHashSet(this.conf.getClusterName()));
-        }
+        // Set the inflight state waiting time and ownership monitor delay 
time to 5 seconds to avoid
+        // stuck when doing unload.
+        initConfig(conf);
+        super.internalSetup(conf);
+        pulsar1 = pulsar;
+        ServiceConfiguration defaultConf = getDefaultConf();
+        initConfig(defaultConf);
+        additionalPulsarTestContext = 
createAdditionalPulsarTestContext(defaultConf);
+        pulsar2 = additionalPulsarTestContext.getPulsarService();
+
+        setPrimaryLoadManager();
+
+        setSecondaryLoadManager();
+
+        admin.clusters().createCluster(this.conf.getClusterName(),
+                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        admin.tenants().createTenant("public",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
+                        Sets.newHashSet(this.conf.getClusterName())));
+        admin.namespaces().createNamespace("public/default");
+        admin.namespaces().setNamespaceReplicationClusters("public/default",
+                Sets.newHashSet(this.conf.getClusterName()));
+
+        admin.namespaces().createNamespace(defaultTestNamespace);
+        
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
+                Sets.newHashSet(this.conf.getClusterName()));
     }
 
     @Override
     @AfterClass(alwaysRun = true)
     protected void cleanup() throws Exception {
-        pulsar1 = null;
-        pulsar2.close();
-        super.internalCleanup();
         this.additionalPulsarTestContext.close();
+        super.internalCleanup();
     }
 
     @BeforeMethod(alwaysRun = true)
@@ -236,9 +226,6 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         Optional<BrokerLookupData> brokerLookupData1 = 
secondaryLoadManager.assign(Optional.empty(), bundle).get();
         assertEquals(brokerLookupData, brokerLookupData1);
 
-        verify(primaryLoadManager, times(1)).getBrokerSelectionStrategy();
-        verify(secondaryLoadManager, times(0)).getBrokerSelectionStrategy();
-
         Optional<LookupResult> lookupResult = pulsar2.getNamespaceService()
                 .getBrokerServiceUrlAsync(topicName, null).get();
         assertTrue(lookupResult.isPresent());
@@ -462,7 +449,8 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
                 "specified_positions_divide", List.of(bundleRanges.get(0), 
bundleRanges.get(1), splitPosition));
 
         BundlesData bundlesData = admin.namespaces().getBundles(namespace);
-        assertEquals(bundlesData.getNumBundles(), numBundles + 1);
+        Awaitility.waitAtMost(15, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(bundlesData.getNumBundles(), 
numBundles + 1));
         String lowBundle = String.format("0x%08x", bundleRanges.get(0));
         String midBundle = String.format("0x%08x", splitPosition);
         String highBundle = String.format("0x%08x", bundleRanges.get(1));
@@ -475,15 +463,26 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         final String namespace = "public/testDeleteNamespaceBundle";
         admin.namespaces().createNamespace(namespace, 3);
         TopicName topicName = TopicName.get(namespace + 
"/test-delete-namespace-bundle");
-        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
 
-        String broker = admin.lookups().lookupTopic(topicName.toString());
-        log.info("Assign the bundle {} to {}", bundle, broker);
-
-        checkOwnershipState(broker, bundle);
 
-        admin.namespaces().deleteNamespaceBundle(topicName.getNamespace(), 
bundle.getBundleRange());
-        assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+        Awaitility.await()
+                .atMost(15, TimeUnit.SECONDS)
+                .ignoreExceptions()
+                .untilAsserted(() -> {
+                    NamespaceBundle bundle = getBundleAsync(pulsar1, 
topicName).get();
+                    String broker = 
admin.lookups().lookupTopic(topicName.toString());
+                    log.info("Assign the bundle {} to {}", bundle, broker);
+                    checkOwnershipState(broker, bundle);
+                    
admin.namespaces().deleteNamespaceBundle(topicName.getNamespace(), 
bundle.getBundleRange(), true);
+                    // this could fail if the system topic lookup 
asynchronously happens before this.
+                    // we will retry if it fails.
+                    
assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+                });
+
+        Awaitility.await()
+                .atMost(15, TimeUnit.SECONDS)
+                .ignoreExceptions()
+                .untilAsserted(() -> 
admin.namespaces().deleteNamespace(namespace, true));
     }
 
     @Test(timeOut = 30 * 1000)
@@ -708,7 +707,7 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(result, expectedBrokerServiceUrl);
     }
 
-    @Test
+    @Test(priority = 10)
     public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws 
Exception {
         var topBundlesLoadDataStorePrimary =
                 (LoadDataStore) 
FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", 
true);
@@ -1219,6 +1218,21 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         admin.brokers().healthcheck(TopicVersion.V2);
     }
 
+    @Test(timeOut = 30 * 1000)
+    public void compactionScheduleTest() {
+        Awaitility.await()
+                .pollInterval(200, TimeUnit.MILLISECONDS)
+                .atMost(30, TimeUnit.SECONDS)
+                .ignoreExceptions()
+                .untilAsserted(() -> { // wait until true
+                    primaryLoadManager.monitor();
+                    secondaryLoadManager.monitor();
+                    var threshold = admin.topicPolicies()
+                            
.getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, false);
+                    AssertJUnit.assertEquals(5 * 1024 * 1024, threshold == 
null ? 0 : threshold.longValue());
+                });
+    }
+
     private static abstract class MockBrokerFilter implements BrokerFilter {
 
         @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 158b91fd277..84988a3b3bf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -86,7 +86,6 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.TableViewImpl;
 import org.apache.pulsar.common.policies.data.TopicType;
@@ -331,23 +330,6 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         return errorCnt;
     }
 
-    @Test(priority = 1)
-    public void compactionScheduleTest() {
-
-        Awaitility.await()
-                .pollInterval(200, TimeUnit.MILLISECONDS)
-                .atMost(5, TimeUnit.SECONDS)
-                .untilAsserted(() -> { // wait until true
-                    try {
-                        var threshold = admin.topicPolicies()
-                                
.getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, false).longValue();
-                        assertEquals(5 * 1024 * 1024, threshold);
-                    } catch (Exception e) {
-                        ;
-                    }
-                });
-    }
-
     @Test(priority = 2)
     public void assignmentTest()
             throws ExecutionException, InterruptedException, 
IllegalAccessException, TimeoutException {
@@ -927,8 +909,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test(priority = 10)
-    public void conflictAndCompactionTest() throws ExecutionException, 
InterruptedException, TimeoutException,
-            IllegalAccessException, PulsarClientException, 
PulsarServerException {
+    public void conflictAndCompactionTest() throws Exception {
         String bundle = String.format("%s/%s", "public/default", 
"0x0000000a_0xffffffff");
         var owner1 = channel1.getOwnerAsync(bundle);
         var owner2 = channel2.getOwnerAsync(bundle);
@@ -961,26 +942,41 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         Field strategicCompactorField = 
FieldUtils.getDeclaredField(PulsarService.class, "strategicCompactor", true);
         FieldUtils.writeField(strategicCompactorField, pulsar1, compactor, 
true);
         FieldUtils.writeField(strategicCompactorField, pulsar2, compactor, 
true);
-        Awaitility.await()
-                .pollInterval(200, TimeUnit.MILLISECONDS)
-                .atMost(140, TimeUnit.SECONDS)
-                .untilAsserted(() -> {
-                    channel1.publishAssignEventAsync(bundle, brokerId1);
-                    verify(compactor, times(1))
-                            .compact(eq(ServiceUnitStateChannelImpl.TOPIC), 
any());
-                });
+
+        var threshold = admin.topicPolicies()
+                .getCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC);
+        admin.topicPolicies()
+                .setCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, 0);
+
+        try {
+            Awaitility.await()
+                    .pollInterval(200, TimeUnit.MILLISECONDS)
+                    .atMost(140, TimeUnit.SECONDS)
+                    .untilAsserted(() -> {
+                        channel1.publishAssignEventAsync(bundle, brokerId1);
+                        verify(compactor, times(1))
+                                
.compact(eq(ServiceUnitStateChannelImpl.TOPIC), any());
+                    });
+
+
+            var channel3 = createChannel(pulsar);
+            channel3.start();
+            Awaitility.await()
+                    .pollInterval(200, TimeUnit.MILLISECONDS)
+                    .atMost(5, TimeUnit.SECONDS)
+                    .untilAsserted(() -> assertEquals(
+                            channel3.getOwnerAsync(bundle).get(), 
Optional.of(brokerId1)));
+            channel3.close();
+        } finally {
+            FieldUtils.writeDeclaredField(channel2,
+                    "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
+            if (threshold != null) {
+                admin.topicPolicies()
+                        
.setCompactionThreshold(ServiceUnitStateChannelImpl.TOPIC, threshold);
+            }
+        }
 
 
-        var channel3 = createChannel(pulsar);
-        channel3.start();
-        Awaitility.await()
-                .pollInterval(200, TimeUnit.MILLISECONDS)
-                .atMost(5, TimeUnit.SECONDS)
-                .untilAsserted(() -> assertEquals(
-                        channel3.getOwnerAsync(bundle).get(), 
Optional.of(brokerId1)));
-        channel3.close();
-        FieldUtils.writeDeclaredField(channel2,
-                "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
     }
 
     @Test(priority = 11)
@@ -1583,7 +1579,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         // verify getOwnerAsync times out because the owner is inactive now.
         long start = System.currentTimeMillis();
         var ex = expectThrows(ExecutionException.class, () -> 
channel1.getOwnerAsync(bundle).get());
-        assertTrue(ex.getCause() instanceof TimeoutException);
+        assertTrue(ex.getCause() instanceof IllegalStateException);
         assertTrue(System.currentTimeMillis() - start >= 1000);
 
         // simulate ownership cleanup(no selected owner) by the leader channel
@@ -1783,6 +1779,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
             throws IllegalAccessException {
         var tv = (TableViewImpl<ServiceUnitStateData>)
                 FieldUtils.readField(channel, "tableview", true);
+        var getOwnerRequests = (Map<String, CompletableFuture<String>>)
+                FieldUtils.readField(channel, "getOwnerRequests", true);
         var cache = (ConcurrentMap<String, ServiceUnitStateData>)
                 FieldUtils.readField(tv, "data", true);
         if(val == null){
@@ -1790,6 +1788,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         } else {
             cache.put(serviceUnit, val);
         }
+        getOwnerRequests.clear();
     }
 
     private static void cleanOpsCounters(ServiceUnitStateChannel channel)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
index f486370400c..d25cba2bd1b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStoreTest.java
@@ -20,8 +20,6 @@ package org.apache.pulsar.broker.loadbalance.extensions.store;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertTrue;
 
 import com.google.common.collect.Sets;
@@ -29,6 +27,7 @@ import lombok.AllArgsConstructor;
 import lombok.Cleanup;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.commons.lang.reflect.FieldUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -40,7 +39,6 @@ import org.testng.annotations.Test;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 
 @Test(groups = "broker")
 public class LoadDataStoreTest extends MockedPulsarServiceBaseTest {
@@ -76,7 +74,7 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
 
         @Cleanup
         LoadDataStore<MyClass> loadDataStore =
-                LoadDataStoreFactory.create(pulsar.getClient(), topic, 
MyClass.class);
+                LoadDataStoreFactory.create(pulsar, topic, MyClass.class);
         loadDataStore.startProducer();
         loadDataStore.startTableView();
         MyClass myClass1 = new MyClass("1", 1);
@@ -110,7 +108,7 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
 
         @Cleanup
         LoadDataStore<Integer> loadDataStore =
-                LoadDataStoreFactory.create(pulsar.getClient(), topic, 
Integer.class);
+                LoadDataStoreFactory.create(pulsar, topic, Integer.class);
         loadDataStore.startProducer();
         loadDataStore.startTableView();
 
@@ -135,7 +133,7 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
     public void testTableViewRestart() throws Exception {
         String topic = TopicDomain.persistent + "://" + 
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
         LoadDataStore<Integer> loadDataStore =
-                LoadDataStoreFactory.create(pulsar.getClient(), topic, 
Integer.class);
+                LoadDataStoreFactory.create(pulsar, topic, Integer.class);
         loadDataStore.startProducer();
 
         loadDataStore.startTableView();
@@ -145,43 +143,26 @@ public class LoadDataStoreTest extends 
MockedPulsarServiceBaseTest {
         loadDataStore.closeTableView();
 
         loadDataStore.pushAsync("1", 2).get();
-        Exception ex = null;
-        try {
-            loadDataStore.get("1");
-        } catch (IllegalStateException e) {
-            ex = e;
-        }
-        assertNotNull(ex);
-        loadDataStore.startTableView();
         Awaitility.await().untilAsserted(() -> 
assertEquals(loadDataStore.get("1").get(), 2));
+
+        loadDataStore.pushAsync("1", 3).get();
+        FieldUtils.writeField(loadDataStore, "tableViewLastUpdateTimestamp", 0 
, true);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(loadDataStore.get("1").get(), 3));
     }
 
     @Test
     public void testProducerStop() throws Exception {
         String topic = TopicDomain.persistent + "://" + 
NamespaceName.SYSTEM_NAMESPACE + "/" + UUID.randomUUID();
         LoadDataStore<Integer> loadDataStore =
-                LoadDataStoreFactory.create(pulsar.getClient(), topic, 
Integer.class);
+                LoadDataStoreFactory.create(pulsar, topic, Integer.class);
         loadDataStore.startProducer();
         loadDataStore.pushAsync("1", 1).get();
         loadDataStore.removeAsync("1").get();
 
         loadDataStore.close();
 
-        try {
-            loadDataStore.pushAsync("2", 2).get();
-            fail();
-        } catch (ExecutionException ex) {
-            assertTrue(ex.getCause() instanceof IllegalStateException);
-        }
-        try {
-            loadDataStore.removeAsync("2").get();
-            fail();
-        } catch (ExecutionException ex) {
-            assertTrue(ex.getCause() instanceof IllegalStateException);
-        }
-        loadDataStore.startProducer();
-        loadDataStore.pushAsync("3", 3).get();
-        loadDataStore.removeAsync("3").get();
+        loadDataStore.pushAsync("2", 2).get();
+        loadDataStore.removeAsync("2").get();
     }
 
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 560636f9462..d46f7fa1408 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -284,8 +284,14 @@ public class TableViewImpl<T> implements TableView<T> {
                         log.error("Reader {} was closed while reading tail 
messages.",
                                 reader.getTopic(), ex);
                     } else {
+                        // Retrying on the other exceptions such as 
NotConnectedException
+                        try {
+                            Thread.sleep(50);
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        }
                         log.warn("Reader {} was interrupted while reading tail 
messages. "
-                                        + "Retrying..", reader.getTopic(), ex);
+                                + "Retrying..", reader.getTopic(), ex);
                         readTailMessages(reader);
                     }
                     return null;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index e262b27fe23..b9707ea76c3 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -91,7 +91,6 @@ public class ExtensibleLoadManagerTest extends 
TestRetrySupport {
                 
"org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder");
         brokerEnvs.put("forceDeleteNamespaceAllowed", "true");
         brokerEnvs.put("loadBalancerDebugModeEnabled", "true");
-        brokerEnvs.put("topicLevelPoliciesEnabled", "false");
         brokerEnvs.put("PULSAR_MEM", "-Xmx512M");
         spec.brokerEnvs(brokerEnvs);
         pulsarCluster = PulsarCluster.forSpec(spec);

Reply via email to