This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5daea49e7793982a468585cd0e6853b64e74a5fa
Author: Kai Wang <[email protected]>
AuthorDate: Wed Feb 19 21:41:07 2025 +0800

    [improve][broker] Fix non-persistent system topic schema compatibility 
(#23286)
    
    ### Motivation
    
    When upgrading broker version from `3.0.x` to `3.3.x` with 
`ExtensibleLoadManagerImpl` enabled, it will have an `Unable to read schema` 
exception. And the broker will fail to start. This issue is caused by 
https://github.com/apache/pulsar/pull/22055 .
    
    ### Modifications
    
    Add a new class `NonPersistentSystemTopic`, and it will use for system 
non-persistent topic.
    
    (cherry picked from commit 7dbd8a507fc07d4fd5dc813ad76ee8080afd43cb)
---
 .../pulsar/broker/service/BrokerService.java       |  7 +++-
 .../nonpersistent/NonPersistentSystemTopic.java    | 32 +++++++++++++++++
 .../extensions/ExtensibleLoadManagerImplTest.java  | 42 ++++++++++++++++++++++
 3 files changed, 80 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 362fc66e4d8..922d85df709 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -128,6 +128,7 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotRea
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
 import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
 import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
@@ -1325,7 +1326,11 @@ public class BrokerService implements Closeable {
         final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         NonPersistentTopic nonPersistentTopic;
         try {
-            nonPersistentTopic = newTopic(topic, null, this, 
NonPersistentTopic.class);
+            if (isSystemTopic(topic)) {
+                nonPersistentTopic = new NonPersistentSystemTopic(topic, this);
+            } else {
+                nonPersistentTopic = newTopic(topic, null, this, 
NonPersistentTopic.class);
+            }
             nonPersistentTopic.setCreateFuture(topicFuture);
         } catch (Throwable e) {
             log.warn("Failed to create topic {}", topic, e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSystemTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSystemTopic.java
new file mode 100644
index 00000000000..9b867c9a8b3
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSystemTopic.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.nonpersistent;
+
+import org.apache.pulsar.broker.service.BrokerService;
+
+public class NonPersistentSystemTopic extends NonPersistentTopic {
+    public NonPersistentSystemTopic(String topic, BrokerService brokerService) 
{
+        super(topic, brokerService);
+    }
+
+    @Override
+    public boolean isSystemTopic() {
+        return true;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 0909944f349..0417b6fc144 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -78,6 +78,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.reflect.FieldUtils;
@@ -101,6 +102,7 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
 import 
org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
 import 
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
+import 
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
 import 
org.apache.pulsar.broker.loadbalance.extensions.store.TableViewLoadDataStoreImpl;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.broker.lookup.LookupResult;
@@ -110,6 +112,7 @@ import 
org.apache.pulsar.broker.namespace.NamespaceBundleSplitListener;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -130,6 +133,7 @@ import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.BrokerAssignment;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -137,6 +141,7 @@ import 
org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Factory;
@@ -2183,6 +2188,43 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
                 });
     }
 
+    @Test
+    public void testSystemNonPersistentTopicSchemaCompatibility() throws 
Exception {
+        String topicName = 
ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC;
+        NonPersistentSystemTopic topic = new 
NonPersistentSystemTopic(topicName, pulsar.getBrokerService());
+        Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, 
topic.getSchemaCompatibilityStrategy());
+
+        var brokerLoadDataStore = LoadDataStoreFactory.create(pulsar, 
topicName, BrokerLoadDataV1.class);
+        brokerLoadDataStore.init();
+        brokerLoadDataStore.pushAsync("key", new BrokerLoadDataV1()).get();
+        Awaitility.await().until(() -> {
+            var data = brokerLoadDataStore.get("key");
+            return data.isPresent();
+        });
+        brokerLoadDataStore.pushAsync("key", null).get();
+        brokerLoadDataStore.close();
+    }
+
+    @Data
+    private static class BrokerLoadDataV1 {
+        private ResourceUsage cpu;
+        private ResourceUsage memory;
+        private ResourceUsage directMemory;
+        private ResourceUsage bandwidthIn;
+        private ResourceUsage bandwidthOut;
+        private double msgThroughputIn;
+        private double msgThroughputOut;
+        private double msgRateIn;
+        private double msgRateOut;
+        private int bundleCount;
+        private int topics;
+        private double maxResourceUsage;
+        private double weightedMaxEMA;
+        private double msgThroughputEMA;
+        private long updatedAt;
+        private long reportedAt;
+    }
+
     @Test(timeOut = 30 * 1000)
     public void testMonitorBrokerRegistry() throws MetadataStoreException {
         primaryLoadManager.getBrokerRegistry().unregister();

Reply via email to