This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e8959bd  KAFKA-5994; Log ClusterAuthorizationException for all 
ClusterAction requests
e8959bd is described below

commit e8959bd766cc6e19f6208fe7ea0a3103cc8fe123
Author: Manikumar Reddy <[email protected]>
AuthorDate: Thu Jan 10 19:18:58 2019 +0530

    KAFKA-5994; Log ClusterAuthorizationException for all ClusterAction requests
    
    Author: Manikumar Reddy <[email protected]>
    
    Reviewers: Rajini Sivaram <[email protected]>
    
    Closes #5021 from omkreddy/KAFKA-5994-CLUSTER-AUTH
---
 core/src/main/scala/kafka/server/KafkaApis.scala | 18 ++++++------------
 1 file changed, 6 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6cf2403..7ad5d72 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -181,10 +181,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    if (!isAuthorizedClusterAction(request)) {
-      sendResponseMaybeThrottle(request, throttleTimeMs => 
leaderAndIsrRequest.getErrorResponse(throttleTimeMs,
-        Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
-    } else if (isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch())) {
+    authorizeClusterAction(request)
+    if (isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch())) {
       // When the broker restarts very quickly, it is possible for this broker 
to receive request intended
       // for its previous generation so the broker should skip the stale 
request.
       info("Received LeaderAndIsr request with broker epoch " +
@@ -201,11 +199,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     // We can't have the ensureTopicExists check here since the controller 
sends it as an advisory to all brokers so they
     // stop serving data to clients for the topic being deleted
     val stopReplicaRequest = request.body[StopReplicaRequest]
-    if (!isAuthorizedClusterAction(request)) {
-      val result = stopReplicaRequest.partitions.asScala.map((_, 
Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
-      sendResponseMaybeThrottle(request, _ =>
-        new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, 
result.asJava))
-    } else if (isBrokerEpochStale(stopReplicaRequest.brokerEpoch())) {
+    authorizeClusterAction(request)
+    if (isBrokerEpochStale(stopReplicaRequest.brokerEpoch())) {
       // When the broker restarts very quickly, it is possible for this broker 
to receive request intended
       // for its previous generation so the broker should skip the stale 
request.
       info("Received stop replica request with broker epoch " +
@@ -234,9 +229,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     val correlationId = request.header.correlationId
     val updateMetadataRequest = request.body[UpdateMetadataRequest]
 
-    if (!isAuthorizedClusterAction(request)) {
-      sendResponseMaybeThrottle(request, _ => new 
UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED))
-    } else if (isBrokerEpochStale(updateMetadataRequest.brokerEpoch())) {
+    authorizeClusterAction(request)
+    if (isBrokerEpochStale(updateMetadataRequest.brokerEpoch())) {
       // When the broker restarts very quickly, it is possible for this broker 
to receive request intended
       // for its previous generation so the broker should skip the stale 
request.
       info("Received update metadata request with broker epoch " +

Reply via email to