This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a5e5e2dcd5d KAFKA-18706 Move AclPublisher to metadata module (#18802)
a5e5e2dcd5d is described below

commit a5e5e2dcd5dc7986b9e11294bb3bd6c7ecbc94ea
Author: PoAn Yang <[email protected]>
AuthorDate: Sun Mar 9 21:00:33 2025 +0800

    KAFKA-18706 Move AclPublisher to metadata module (#18802)
    
    Move AclPublisher to org.apache.kafka.metadata.publisher package.
    
    Reviewers: Christo Lolov <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |   3 +-
 .../main/scala/kafka/server/ControllerServer.scala |   6 +-
 .../scala/kafka/server/metadata/AclPublisher.scala | 102 -------------------
 .../server/metadata/BrokerMetadataPublisher.scala  |   1 +
 .../metadata/BrokerMetadataPublisherTest.scala     |   1 +
 .../kafka/metadata/publisher/AclPublisher.java     | 108 +++++++++++++++++++++
 6 files changed, 115 insertions(+), 106 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 7ff50d175b5..5a65e028877 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -42,6 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorRec
 import org.apache.kafka.coordinator.transaction.ProducerIdManager
 import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, 
MetadataPublisher}
 import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
+import org.apache.kafka.metadata.publisher.AclPublisher
 import org.apache.kafka.security.CredentialProvider
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{ApiMessageAndVersion, 
DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
@@ -528,7 +529,7 @@ class BrokerServer(
           config.nodeId,
           sharedServer.metadataPublishingFaultHandler,
           "broker",
-          authorizer
+          authorizer.toJava
         ),
         sharedServer.initialBrokerMetadataLoadFaultHandler,
         sharedServer.metadataPublishingFaultHandler
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 9e3439d61c3..5ea02cbe623 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
 import kafka.server.QuotaFactory.QuotaManagers
 
 import scala.collection.immutable
-import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, 
DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, 
KRaftMetadataCachePublisher, ScramPublisher}
+import kafka.server.metadata.{ClientQuotaMetadataManager, 
DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, 
DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, 
KRaftMetadataCachePublisher, ScramPublisher}
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.network.ListenerName
@@ -37,7 +37,7 @@ import 
org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
 import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
-import org.apache.kafka.metadata.publisher.FeaturesPublisher
+import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.CredentialProvider
 import org.apache.kafka.server.authorizer.Authorizer
@@ -375,7 +375,7 @@ class ControllerServer(
         config.nodeId,
         sharedServer.metadataPublishingFaultHandler,
         "controller",
-        authorizer
+        authorizer.toJava
       ))
 
       // Install all metadata publishers.
diff --git a/core/src/main/scala/kafka/server/metadata/AclPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/AclPublisher.scala
deleted file mode 100644
index 43fb2058df3..00000000000
--- a/core/src/main/scala/kafka/server/metadata/AclPublisher.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import kafka.utils.Logging
-import org.apache.kafka.image.loader.{LoaderManifest, LoaderManifestType}
-import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
-import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.fault.FaultHandler
-
-import scala.concurrent.TimeoutException
-
-
-class AclPublisher(
-  nodeId: Int,
-  faultHandler: FaultHandler,
-  nodeType: String,
-  authorizer: Option[Authorizer],
-) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
-  logIdent = s"[${name()}] "
-
-  override def name(): String = s"AclPublisher $nodeType id=$nodeId"
-
-  private var completedInitialLoad = false
-
-  override def onMetadataUpdate(
-    delta: MetadataDelta,
-    newImage: MetadataImage,
-    manifest: LoaderManifest
-  ): Unit = {
-    val deltaName = s"MetadataDelta up to ${newImage.offset()}"
-
-    // Apply changes to ACLs. This needs to be handled carefully because while 
we are
-    // applying these changes, the Authorizer is continuing to return 
authorization
-    // results in other threads. We never want to expose an invalid state. For 
example,
-    // if the user created a DENY ALL acl and then created an ALLOW ACL for 
topic foo,
-    // we want to apply those changes in that order, not the reverse order! 
Otherwise
-    // there could be a window during which incorrect authorization results 
are returned.
-    Option(delta.aclsDelta()).foreach { aclsDelta =>
-      authorizer match {
-        case Some(authorizer: ClusterMetadataAuthorizer) => if 
(manifest.`type`().equals(LoaderManifestType.SNAPSHOT)) {
-          try {
-            // If the delta resulted from a snapshot load, we want to apply 
the new changes
-            // all at once using ClusterMetadataAuthorizer#loadSnapshot. If 
this is the
-            // first snapshot load, it will also complete the futures returned 
by
-            // Authorizer#start (which we wait for before processing RPCs).
-            info(s"Loading authorizer snapshot at offset ${newImage.offset()}")
-            authorizer.loadSnapshot(newImage.acls().acls())
-          } catch {
-            case t: Throwable => faultHandler.handleFault("Error loading " +
-              s"authorizer snapshot in $deltaName", t)
-          }
-        } else {
-          try {
-            // Because the changes map is a LinkedHashMap, the deltas will be 
returned in
-            // the order they were performed.
-            aclsDelta.changes().forEach((key, value) =>
-              if (value.isPresent) {
-                authorizer.addAcl(key, value.get())
-              } else {
-                authorizer.removeAcl(key)
-              })
-          } catch {
-            case t: Throwable => faultHandler.handleFault("Error loading " +
-              s"authorizer changes in $deltaName", t)
-          }
-        }
-        if (!completedInitialLoad) {
-          // If we are receiving this onMetadataUpdate call, that means the 
MetadataLoader has
-          // loaded up to the local high water mark. So we complete the 
initial load, enabling
-          // the authorizer.
-          completedInitialLoad = true
-          authorizer.completeInitialLoad()
-        }
-        case _ => // No ClusterMetadataAuthorizer is configured. There is 
nothing to do.
-      }
-    }
-  }
-
-  override def close(): Unit = {
-    authorizer match {
-      case Some(authorizer: ClusterMetadataAuthorizer) => 
authorizer.completeInitialLoad(new TimeoutException)
-      case _ =>
-    }
-  }
-}
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 1985f04348f..8eb0f45def5 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.image.loader.LoaderManifest
 import org.apache.kafka.image.publisher.MetadataPublisher
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
+import org.apache.kafka.metadata.publisher.AclPublisher
 import org.apache.kafka.server.common.RequestLocal
 import org.apache.kafka.server.fault.FaultHandler
 
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index a166368a5aa..575d3855dcd 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataImageTest, MetadataProvenance}
 import org.apache.kafka.image.loader.LogDeltaManifest
+import org.apache.kafka.metadata.publisher.AclPublisher
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.LeaderAndEpoch
 import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/publisher/AclPublisher.java 
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/AclPublisher.java
new file mode 100644
index 00000000000..cbfdb0f68c2
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/AclPublisher.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.publisher;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.loader.LoaderManifestType;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
+import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.fault.FaultHandler;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.concurrent.TimeoutException;
+
+public class AclPublisher implements MetadataPublisher {
+    private final Logger log;
+    private final int nodeId;
+    private final FaultHandler faultHandler;
+    private final String nodeType;
+    private final Optional<ClusterMetadataAuthorizer> authorizer;
+    private boolean completedInitialLoad = false;
+
+    public AclPublisher(int nodeId, FaultHandler faultHandler, String 
nodeType, Optional<Authorizer> authorizer) {
+        this.nodeId = nodeId;
+        this.faultHandler = faultHandler;
+        this.nodeType = nodeType;
+        this.authorizer = 
authorizer.filter(ClusterMetadataAuthorizer.class::isInstance).map(ClusterMetadataAuthorizer.class::cast);
+        this.log = new LogContext(name()).logger(AclPublisher.class);
+    }
+
+    @Override
+    public final String name() {
+        return "AclPublisher " + nodeType + " id=" + nodeId;
+    }
+
+    @Override
+    public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, 
LoaderManifest manifest) {
+        String deltaName = "MetadataDelta up to " + newImage.offset();
+
+        // Apply changes to ACLs. This needs to be handled carefully because 
while we are
+        // applying these changes, the Authorizer is continuing to return 
authorization
+        // results in other threads. We never want to expose an invalid state. 
For example,
+        // if the user created a DENY ALL acl and then created an ALLOW ACL 
for topic foo,
+        // we want to apply those changes in that order, not the reverse 
order! Otherwise
+        // there could be a window during which incorrect authorization 
results are returned.
+        Optional.ofNullable(delta.aclsDelta()).ifPresent(aclsDelta -> {
+            authorizer.ifPresent(clusterMetadataAuthorizer -> {
+                if (manifest.type().equals(LoaderManifestType.SNAPSHOT)) {
+                    try {
+                        // If the delta resulted from a snapshot load, we want 
to apply the new changes
+                        // all at once using 
ClusterMetadataAuthorizer#loadSnapshot. If this is the
+                        // first snapshot load, it will also complete the 
futures returned by
+                        // Authorizer#start (which we wait for before 
processing RPCs).
+                        log.info("Loading authorizer snapshot at offset {}", 
newImage.offset());
+                        
clusterMetadataAuthorizer.loadSnapshot(newImage.acls().acls());
+                    } catch (Throwable t) {
+                        faultHandler.handleFault("Error loading authorizer 
snapshot in " + deltaName, t);
+                    }
+                } else {
+                    try {
+                        // Because the changes map is a LinkedHashMap, the 
deltas will be returned in
+                        // the order they were performed.
+                        aclsDelta.changes().forEach((key, value) -> {
+                            if (value.isPresent()) {
+                                clusterMetadataAuthorizer.addAcl(key, 
value.get());
+                            } else {
+                                clusterMetadataAuthorizer.removeAcl(key);
+                            }
+                        });
+                    } catch (Throwable t) {
+                        faultHandler.handleFault("Error loading authorizer 
changes in " + deltaName, t);
+                    }
+                }
+                if (!completedInitialLoad) {
+                    // If we are receiving this onMetadataUpdate call, that 
means the MetadataLoader has
+                    // loaded up to the local high water mark. So we complete 
the initial load, enabling
+                    // the authorizer.
+                    completedInitialLoad = true;
+                    clusterMetadataAuthorizer.completeInitialLoad();
+                }
+            });
+        });
+    }
+
+    @Override
+    public void close() {
+        authorizer.ifPresent(clusterMetadataAuthorizer -> 
clusterMetadataAuthorizer.completeInitialLoad(new TimeoutException()));
+    }
+}
\ No newline at end of file

Reply via email to