This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e8959bd KAFKA-5994; Log ClusterAuthorizationException for all
ClusterAction requests
e8959bd is described below
commit e8959bd766cc6e19f6208fe7ea0a3103cc8fe123
Author: Manikumar Reddy <[email protected]>
AuthorDate: Thu Jan 10 19:18:58 2019 +0530
KAFKA-5994; Log ClusterAuthorizationException for all ClusterAction requests
Author: Manikumar Reddy <[email protected]>
Reviewers: Rajini Sivaram <[email protected]>
Closes #5021 from omkreddy/KAFKA-5994-CLUSTER-AUTH
---
core/src/main/scala/kafka/server/KafkaApis.scala | 18 ++++++------------
1 file changed, 6 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6cf2403..7ad5d72 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -181,10 +181,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- if (!isAuthorizedClusterAction(request)) {
- sendResponseMaybeThrottle(request, throttleTimeMs =>
leaderAndIsrRequest.getErrorResponse(throttleTimeMs,
- Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
- } else if (isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch())) {
+ authorizeClusterAction(request)
+ if (isBrokerEpochStale(leaderAndIsrRequest.brokerEpoch())) {
// When the broker restarts very quickly, it is possible for this broker
to receive request intended
// for its previous generation so the broker should skip the stale
request.
info("Received LeaderAndIsr request with broker epoch " +
@@ -201,11 +199,8 @@ class KafkaApis(val requestChannel: RequestChannel,
// We can't have the ensureTopicExists check here since the controller
sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val stopReplicaRequest = request.body[StopReplicaRequest]
- if (!isAuthorizedClusterAction(request)) {
- val result = stopReplicaRequest.partitions.asScala.map((_,
Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
- sendResponseMaybeThrottle(request, _ =>
- new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED,
result.asJava))
- } else if (isBrokerEpochStale(stopReplicaRequest.brokerEpoch())) {
+ authorizeClusterAction(request)
+ if (isBrokerEpochStale(stopReplicaRequest.brokerEpoch())) {
// When the broker restarts very quickly, it is possible for this broker
to receive request intended
// for its previous generation so the broker should skip the stale
request.
info("Received stop replica request with broker epoch " +
@@ -234,9 +229,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val correlationId = request.header.correlationId
val updateMetadataRequest = request.body[UpdateMetadataRequest]
- if (!isAuthorizedClusterAction(request)) {
- sendResponseMaybeThrottle(request, _ => new
UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED))
- } else if (isBrokerEpochStale(updateMetadataRequest.brokerEpoch())) {
+ authorizeClusterAction(request)
+ if (isBrokerEpochStale(updateMetadataRequest.brokerEpoch())) {
// When the broker restarts very quickly, it is possible for this broker
to receive request intended
// for its previous generation so the broker should skip the stale
request.
info("Received update metadata request with broker epoch " +