BewareMyPower commented on code in PR #24427:
URL: https://github.com/apache/pulsar/pull/24427#discussion_r2297151616
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java:
##########
@@ -178,92 +190,248 @@ public CompletableFuture<Void>
deleteTopicPoliciesAsync(TopicName topicName,
log.info("Skip delete topic-level policies because {} has been
removed before", changeEvents);
return CompletableFuture.completedFuture(null);
}
- return sendTopicPolicyEvent(topicName, ActionType.DELETE, null,
- keepGlobalPolicies);
+ // delete local policy
+ return updateTopicPoliciesAsync(topicName, null, false,
ActionType.DELETE, true)
+ .thenCompose(__ ->
+ // delete global policy
+ updateTopicPoliciesAsync(topicName, null, true,
ActionType.DELETE, true));
});
}
@Override
- public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName
topicName, TopicPolicies policies) {
+ public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName
topicName,
+ boolean
isGlobalPolicy,
+ boolean
skipUpdateWhenTopicPolicyDoesntExist,
+
Consumer<TopicPolicies> policyUpdater) {
if
(NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
return CompletableFuture.failedFuture(new
BrokerServiceException.NotAllowedException(
"Not allowed to update topic policy for the heartbeat
topic"));
}
- return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies,
false);
+ return updateTopicPoliciesAsync(topicName, policyUpdater,
isGlobalPolicy, ActionType.UPDATE,
+ skipUpdateWhenTopicPolicyDoesntExist);
}
- private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName,
ActionType actionType,
- @Nullable TopicPolicies policies, boolean
keepGlobalPoliciesAfterDeleting) {
- return pulsarService.getPulsarResources().getNamespaceResources()
- .getPoliciesAsync(topicName.getNamespaceObject())
- .thenCompose(namespacePolicies -> {
- if (namespacePolicies.isPresent() &&
namespacePolicies.get().deleted) {
- log.debug("[{}] skip sending topic policy event since
the namespace is deleted", topicName);
- return CompletableFuture.completedFuture(null);
- }
-
- try {
- createSystemTopicFactoryIfNeeded();
- } catch (PulsarServerException e) {
- return CompletableFuture.failedFuture(e);
- }
- CompletableFuture<Void> result = new CompletableFuture<>();
- writerCaches.get(topicName.getNamespaceObject())
- .whenComplete((writer, cause) -> {
- if (cause != null) {
-
writerCaches.synchronous().invalidate(topicName.getNamespaceObject());
- result.completeExceptionally(cause);
- } else {
- CompletableFuture<MessageId> writeFuture =
-
sendTopicPolicyEventInternal(topicName, actionType, writer, policies,
-
keepGlobalPoliciesAfterDeleting);
- writeFuture.whenComplete((messageId, e) ->
{
- if (e != null) {
- result.completeExceptionally(e);
- } else {
- if (messageId != null) {
- result.complete(null);
+ private CompletableFuture<Void> updateTopicPoliciesAsync(TopicName
topicName,
+
Consumer<TopicPolicies> policyUpdater,
+ boolean
isGlobalPolicy,
+ ActionType
actionType,
+ boolean
skipUpdateWhenTopicPolicyDoesntExist) {
+ if (closed.get()) {
+ return CompletableFuture.failedFuture(new
BrokerServiceException(getClass().getName() + " is closed."));
+ }
+ TopicName partitionedTopicName =
TopicName.get(topicName.getPartitionedTopicName());
+ Pair<TopicName, Boolean> sequencerKey = Pair.of(partitionedTopicName,
isGlobalPolicy);
+
+ CompletableFuture<Void> operationFuture = new CompletableFuture<>();
+
+ // Chain the operation on the sequencer for the specific topic and
policy type
+ topicPolicyUpdateSequencer.compute(sequencerKey, (key, existingFuture)
-> {
+ CompletableFuture<Void> chain = (existingFuture == null ||
existingFuture.isDone())
+ ? CompletableFuture.completedFuture(null)
+ : existingFuture;
+
+ return chain.thenCompose(v ->
+ pulsarService.getPulsarResources().getNamespaceResources()
+ .getPoliciesAsync(topicName.getNamespaceObject())
+ .thenCompose(namespacePolicies -> {
+ if (namespacePolicies.isPresent() &&
namespacePolicies.get().deleted) {
+ log.debug("[{}] skip sending topic policy
event since the namespace is deleted",
+ topicName);
+ return
CompletableFuture.completedFuture(null);
+ }
+ return
getTopicPoliciesAsync(partitionedTopicName,
+ isGlobalPolicy ? GetType.GLOBAL_ONLY :
GetType.LOCAL_ONLY)
+ .thenCompose(currentPolicies -> {
+ if (currentPolicies.isEmpty() &&
skipUpdateWhenTopicPolicyDoesntExist) {
+ log.debug("[{}] No existing
policies, skipping sending event as "
+ + "requested",
topicName);
+ return
CompletableFuture.completedFuture(null);
+ }
+ TopicPolicies policiesToUpdate;
+ if (actionType ==
ActionType.DELETE) {
+ policiesToUpdate = null; //
For delete, policies object is null
} else {
- result.completeExceptionally(
- new
RuntimeException("Got message id is null."));
+ policiesToUpdate =
currentPolicies.isEmpty()
+ ?
createTopicPolicies(isGlobalPolicy)
+ :
currentPolicies.get().clone();
+
policyUpdater.accept(policiesToUpdate);
}
- }
- });
- }
- });
- return result;
- });
+ return
sendTopicPolicyEventInternal(topicName, actionType, policiesToUpdate,
+ isGlobalPolicy);
+ })
+ .thenCompose(messageId -> {
+ if (messageId == null) {
+ return
CompletableFuture.completedFuture(null);
+ } else {
+ // asynchronously wait until
the message ID is read by the reader
+ return
untilMessageIdHasBeenRead(topicName.getNamespaceObject(),
+ messageId);
+ }
+ });
+ }));
+ }).whenComplete((res, ex) -> {
+ // remove the current future from the sequencer map, if it is done
+ // this would remove the future from the sequencer map when the
last operation completes in the chained
+ // future
+ topicPolicyUpdateSequencer.compute(sequencerKey, (key,
existingFuture) -> {
+ if (existingFuture != null && existingFuture.isDone()) {
+ // Remove the completed future from the sequencer map
+ return null;
+ }
+ return existingFuture;
+ });
+ if (ex != null) {
+
writerCaches.synchronous().invalidate(topicName.getNamespaceObject());
+
operationFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
+ } else {
+ operationFuture.complete(res);
+ }
+ });
+ return operationFuture;
+ }
+
+ /**
+ * Asynchronously waits until the message ID has been read by the reader.
+ * This ensures that the write operation has been fully processed and the
changes are effective.
+ * @param namespaceObject the namespace object for which the message ID is
being tracked
+ * @param messageId the message ID to wait for being handled
+ * @return a CompletableFuture that completes when the message ID has been
read by the reader
+ */
+ private CompletableFuture<Void> untilMessageIdHasBeenRead(NamespaceName
namespaceObject, MessageId messageId) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+
getMessageHandlerTracker(namespaceObject).addPendingFuture((MessageIdAdv)
messageId, future);
+ return future;
+ }
+
+ private TopicPolicyMessageHandlerTracker
getMessageHandlerTracker(NamespaceName namespaceObject) {
+ return
topicPolicyMessageHandlerTrackers.computeIfAbsent(namespaceObject,
+ ns -> new TopicPolicyMessageHandlerTracker());
+ }
+
+ private record PendingMessageFuture(MessageId messageId,
CompletableFuture<Void> future)
+ implements Comparable<PendingMessageFuture> {
+ @Override
+ public int compareTo(PendingMessageFuture o) {
+ return messageId.compareTo(o.messageId);
+ }
+ }
+
+ /**
+ * This tracks the last handled message IDs for each partition of the
topic policies topic and
+ * pending futures for topic policy messages. Each namespace has its own
tracker instance since
+ * this is tracking the per-namespace __change_events topic.
+ * The purpose for this tracker is to ensure that write operations on
topic policies don't complete before the topic
+ * policies message has been read by the reader and effective.
+ */
+ private static class TopicPolicyMessageHandlerTracker implements
AutoCloseable {
+ private List<MessageIdAdv> lastHandledMessageIds = new ArrayList<>();
+ private List<PriorityQueue<PendingMessageFuture>> pendingFutures = new
ArrayList<>();
Review Comment:
```suggestion
private final List<MessageIdAdv> lastHandledMessageIds = new
ArrayList<>();
private final List<PriorityQueue<PendingMessageFuture>>
pendingFutures = new ArrayList<>();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]