This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bbe2cabc3ec [fix][broker] Fix bug causing loss of migrated information
when setting other localPolicies in namespace (#23764)
bbe2cabc3ec is described below
commit bbe2cabc3ec0375607cb12665cab0b4745dbd36e
Author: hrzzzz <[email protected]>
AuthorDate: Fri Dec 20 20:04:24 2024 +0800
[fix][broker] Fix bug causing loss of migrated information when setting
other localPolicies in namespace (#23764)
Co-authored-by: ruihongzhou <[email protected]>
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 20 ++++++++++------
.../pulsar/common/naming/NamespaceBundles.java | 3 ++-
.../apache/pulsar/broker/admin/NamespacesTest.java | 27 ++++++++++++++++++++++
.../pulsar/common/policies/data/LocalPolicies.java | 11 ++++++++-
4 files changed, 52 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index d80e2487b4f..ca4c685b280 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -960,7 +960,8 @@ public abstract class NamespacesBase extends AdminResource {
LocalPolicies localPolicies = oldPolicies.map(
policies -> new LocalPolicies(policies.bundles,
bookieAffinityGroup,
- policies.namespaceAntiAffinityGroup))
+ policies.namespaceAntiAffinityGroup,
+ policies.migrated))
.orElseGet(() -> new
LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
bookieAffinityGroup,
null));
@@ -1779,7 +1780,8 @@ public abstract class NamespacesBase extends
AdminResource {
getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, (lp)->
lp.map(policies -> new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
- antiAffinityGroup))
+ antiAffinityGroup,
+ policies.migrated))
.orElseGet(() -> new LocalPolicies(defaultBundle(),
null, antiAffinityGroup))
);
@@ -1816,7 +1818,8 @@ public abstract class NamespacesBase extends
AdminResource {
getLocalPolicies().setLocalPolicies(namespaceName, (policies)->
new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
- null));
+ null,
+ policies.migrated));
log.info("[{}] Successfully removed anti-affinity group for a
namespace={}", clientAppId(), namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to remove anti-affinity group for namespace
{}", clientAppId(), namespaceName, e);
@@ -2765,10 +2768,13 @@ public abstract class NamespacesBase extends
AdminResource {
protected void internalEnableMigration(boolean migrated) {
validateSuperUserAccess();
try {
- getLocalPolicies().setLocalPolicies(namespaceName, (policies) -> {
- policies.migrated = migrated;
- return policies;
- });
+ getLocalPolicies().setLocalPoliciesWithCreate(namespaceName,
oldPolicies -> oldPolicies.map(
+ policies -> new LocalPolicies(policies.bundles,
+ policies.bookieAffinityGroup,
+ policies.namespaceAntiAffinityGroup,
+ migrated))
+ .orElseGet(() -> new
LocalPolicies(getBundles(config().getDefaultNumberOfNamespaceBundles()),
+ null, null, migrated)));
log.info("Successfully updated migration on namespace {}",
namespaceName);
} catch (Exception e) {
log.error("Failed to update migration on namespace {}",
namespaceName, e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
index fa7baeaa606..3ee365cdd45 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java
@@ -197,6 +197,7 @@ public class NamespaceBundles {
public LocalPolicies toLocalPolicies() {
return new LocalPolicies(this.getBundlesData(),
localPolicies.map(lp ->
lp.getLeft().bookieAffinityGroup).orElse(null),
- localPolicies.map(lp ->
lp.getLeft().namespaceAntiAffinityGroup).orElse(null));
+ localPolicies.map(lp ->
lp.getLeft().namespaceAntiAffinityGroup).orElse(null),
+ localPolicies.map(lp -> lp.getLeft().migrated).orElse(false));
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index f2948660952..18cc449d15d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -102,6 +102,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
@@ -2195,4 +2196,30 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
admin.namespaces().deleteNamespace(namespace);
}
+
+ public void testMigratedInfoIsNotLostDuringOtherLocalPoliciesUpdate()
throws Exception {
+ String namespace = BrokerTestUtil.newUniqueName(this.testTenant +
"/namespace");
+ admin.namespaces().createNamespace(namespace,
Set.of(testLocalCluster));
+
+ admin.namespaces().updateMigrationState(namespace, true);
+ assertTrue(admin.namespaces().getPolicies(namespace).migrated);
+
+ String bookieAffinityGroupPrimary = "group1";
+ admin.namespaces().setBookieAffinityGroup(namespace,
+
BookieAffinityGroupData.builder().bookkeeperAffinityGroupPrimary(bookieAffinityGroupPrimary).build());
+
assertEquals(admin.namespaces().getBookieAffinityGroup(namespace).getBookkeeperAffinityGroupPrimary(),
+ bookieAffinityGroupPrimary);
+ assertTrue(admin.namespaces().getPolicies(namespace).migrated);
+
+ String namespaceAntiAffinityGroup = "group2";
+ admin.namespaces().setNamespaceAntiAffinityGroup(namespace,
namespaceAntiAffinityGroup);
+
assertEquals(admin.namespaces().getNamespaceAntiAffinityGroup(namespace),
namespaceAntiAffinityGroup);
+ assertTrue(admin.namespaces().getPolicies(namespace).migrated);
+
+ admin.namespaces().deleteBookieAffinityGroup(namespace);
+ assertTrue(admin.namespaces().getPolicies(namespace).migrated);
+
+ admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace);
+ assertTrue(admin.namespaces().getPolicies(namespace).migrated);
+ }
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
index 3b17dbe067e..43f5130eb9f 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/LocalPolicies.java
@@ -34,20 +34,29 @@ public class LocalPolicies {
public final BookieAffinityGroupData bookieAffinityGroup;
// namespace anti-affinity-group
public final String namespaceAntiAffinityGroup;
- public boolean migrated;
+ public final boolean migrated;
public LocalPolicies() {
bundles = defaultBundle();
bookieAffinityGroup = null;
namespaceAntiAffinityGroup = null;
+ migrated = false;
}
public LocalPolicies(BundlesData data,
BookieAffinityGroupData bookieAffinityGroup,
String namespaceAntiAffinityGroup) {
+ this(data, bookieAffinityGroup, namespaceAntiAffinityGroup, false);
+ }
+
+ public LocalPolicies(BundlesData data,
+ BookieAffinityGroupData bookieAffinityGroup,
+ String namespaceAntiAffinityGroup,
+ boolean migrated) {
bundles = data;
this.bookieAffinityGroup = bookieAffinityGroup;
this.namespaceAntiAffinityGroup = namespaceAntiAffinityGroup;
+ this.migrated = migrated;
}
}
\ No newline at end of file