This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c301025  KAFKA-8090: Use automatic RPC generation in ControlledShutdown
c301025 is described below

commit c3010254843dcdf0bd68602f752f690cd76d9813
Author: Mickael Maison <mickael.mai...@gmail.com>
AuthorDate: Thu Apr 4 22:05:04 2019 +0530

    KAFKA-8090: Use automatic RPC generation in ControlledShutdown
    
    Author: Mickael Maison <mickael.mai...@gmail.com>
    
    Reviewers: Manikumar Reddy <manikumar.re...@gmail.com>
    
    Closes #6423 from mimaison/controlled-shutdown
---
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  8 +-
 .../kafka/common/requests/AbstractResponse.java    |  2 +-
 .../common/requests/ControlledShutdownRequest.java | 76 ++++++------------
 .../requests/ControlledShutdownResponse.java       | 90 +++++++---------------
 .../kafka/common/requests/RequestResponseTest.java | 38 +++++++--
 core/src/main/scala/kafka/server/KafkaApis.scala   |  4 +-
 core/src/main/scala/kafka/server/KafkaServer.scala | 12 ++-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  9 ++-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  7 +-
 9 files changed, 110 insertions(+), 136 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index ed9787f..f49c99a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.protocol;
 
+import org.apache.kafka.common.message.ControlledShutdownRequestData;
+import org.apache.kafka.common.message.ControlledShutdownResponseData;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
 import org.apache.kafka.common.message.DeleteTopicsRequestData;
@@ -49,8 +51,6 @@ import 
org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
 import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
-import org.apache.kafka.common.requests.ControlledShutdownRequest;
-import org.apache.kafka.common.requests.ControlledShutdownResponse;
 import org.apache.kafka.common.requests.CreateAclsRequest;
 import org.apache.kafka.common.requests.CreateAclsResponse;
 import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
@@ -129,8 +129,8 @@ public enum ApiKeys {
     STOP_REPLICA(5, "StopReplica", true, StopReplicaRequest.schemaVersions(), 
StopReplicaResponse.schemaVersions()),
     UPDATE_METADATA(6, "UpdateMetadata", true, 
UpdateMetadataRequest.schemaVersions(),
             UpdateMetadataResponse.schemaVersions()),
-    CONTROLLED_SHUTDOWN(7, "ControlledShutdown", true, 
ControlledShutdownRequest.schemaVersions(),
-            ControlledShutdownResponse.schemaVersions()),
+    CONTROLLED_SHUTDOWN(7, "ControlledShutdown", true, 
ControlledShutdownRequestData.SCHEMAS,
+            ControlledShutdownResponseData.SCHEMAS),
     OFFSET_COMMIT(8, "OffsetCommit", OffsetCommitRequest.schemaVersions(), 
OffsetCommitResponse.schemaVersions()),
     OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequest.schemaVersions(), 
OffsetFetchResponse.schemaVersions()),
     FIND_COORDINATOR(10, "FindCoordinator", 
FindCoordinatorRequest.schemaVersions(),
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index f74e1ae..f594f20 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -95,7 +95,7 @@ public abstract class AbstractResponse extends 
AbstractRequestResponse {
             case STOP_REPLICA:
                 return new StopReplicaResponse(struct);
             case CONTROLLED_SHUTDOWN:
-                return new ControlledShutdownResponse(struct);
+                return new ControlledShutdownResponse(struct, version);
             case UPDATE_METADATA:
                 return new UpdateMetadataResponse(struct);
             case LEADER_AND_ISR:
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index ea8ff5f..f89316a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -16,97 +16,68 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.ControlledShutdownRequestData;
+import org.apache.kafka.common.message.ControlledShutdownResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
-import java.util.Collections;
 
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
 
 public class ControlledShutdownRequest extends AbstractRequest {
-    private static final String BROKER_ID_KEY_NAME = "broker_id";
-    private static final String BROKER_EPOCH_KEY_NAME = "broker_epoch";
-
-    private static final Schema CONTROLLED_SHUTDOWN_REQUEST_V0 = new Schema(
-            new Field(BROKER_ID_KEY_NAME, INT32, "The id of the broker for 
which controlled shutdown has been requested."));
-    private static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = 
CONTROLLED_SHUTDOWN_REQUEST_V0;
-    // Introduce broker_epoch to allow controller to reject stale 
ControlledShutdownRequest
-    private static final Schema CONTROLLED_SHUTDOWN_REQUEST_V2 = new Schema(
-            new Field(BROKER_ID_KEY_NAME, INT32, "The id of the broker for 
which controlled shutdown has been requested."),
-            new Field(BROKER_EPOCH_KEY_NAME, INT64, "The broker epoch"));
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {CONTROLLED_SHUTDOWN_REQUEST_V0, 
CONTROLLED_SHUTDOWN_REQUEST_V1, CONTROLLED_SHUTDOWN_REQUEST_V2};
-    }
 
     public static class Builder extends 
AbstractRequest.Builder<ControlledShutdownRequest> {
-        private final int brokerId;
-        private final long brokerEpoch;
 
-        public Builder(int brokerId, long brokerEpoch, short desiredVersion) {
+        private final ControlledShutdownRequestData data;
+
+        public Builder(ControlledShutdownRequestData data, short 
desiredVersion) {
             super(ApiKeys.CONTROLLED_SHUTDOWN, desiredVersion);
-            this.brokerId = brokerId;
-            this.brokerEpoch = brokerEpoch;
+            this.data = data;
         }
 
         @Override
         public ControlledShutdownRequest build(short version) {
-            return new ControlledShutdownRequest(brokerId, brokerEpoch, 
version);
+            return new ControlledShutdownRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=ControlledShutdownRequest").
-                append(", brokerId=").append(brokerId).
-                append(", brokerEpoch=").append(brokerEpoch).
-                append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
-    private final int brokerId;
-    private final long brokerEpoch;
 
-    private ControlledShutdownRequest(int brokerId, long brokerEpoch, short 
version) {
+    private final ControlledShutdownRequestData data;
+    private final short version;
+
+    private ControlledShutdownRequest(ControlledShutdownRequestData data, 
short version) {
         super(ApiKeys.CONTROLLED_SHUTDOWN, version);
-        this.brokerId = brokerId;
-        this.brokerEpoch = brokerEpoch;
+        this.data = data;
+        this.version = version;
     }
 
     public ControlledShutdownRequest(Struct struct, short version) {
         super(ApiKeys.CONTROLLED_SHUTDOWN, version);
-        brokerId = struct.getInt(BROKER_ID_KEY_NAME);
-        brokerEpoch = struct.hasField(BROKER_EPOCH_KEY_NAME) ? 
struct.getLong(BROKER_EPOCH_KEY_NAME) :
-                AbstractControlRequest.UNKNOWN_BROKER_EPOCH;
+        this.data = new ControlledShutdownRequestData(struct, version);
+        this.version = version;
     }
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        ControlledShutdownResponseData response = new 
ControlledShutdownResponseData();
+        response.setErrorCode(Errors.forException(e).code());
         short versionId = version();
         switch (versionId) {
             case 0:
             case 1:
             case 2:
-                return new ControlledShutdownResponse(Errors.forException(e), 
Collections.emptySet());
+                return new ControlledShutdownResponse(response);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                     versionId, this.getClass().getSimpleName(), 
ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()));
         }
     }
 
-    public int brokerId() {
-        return brokerId;
-    }
-
-    public long brokerEpoch() {
-        return brokerEpoch;
-    }
-
     public static ControlledShutdownRequest parse(ByteBuffer buffer, short 
version) {
         return new ControlledShutdownRequest(
                 ApiKeys.CONTROLLED_SHUTDOWN.parseRequest(version, buffer), 
version);
@@ -114,9 +85,10 @@ public class ControlledShutdownRequest extends 
AbstractRequest {
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new 
Struct(ApiKeys.CONTROLLED_SHUTDOWN.requestSchema(version()));
-        struct.set(BROKER_ID_KEY_NAME, brokerId);
-        struct.setIfExists(BROKER_EPOCH_KEY_NAME, brokerEpoch);
-        return struct;
+        return data.toStruct(version);
+    }
+
+    public ControlledShutdownRequestData data() {
+        return data;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
index 1d62b2d..5448f03 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -17,44 +17,21 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ControlledShutdownResponseData;
+import 
org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
+import 
org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionSet;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
 
 public class ControlledShutdownResponse extends AbstractResponse {
 
-    private static final String PARTITIONS_REMAINING_KEY_NAME = 
"partitions_remaining";
-
-    private static final Schema CONTROLLED_SHUTDOWN_PARTITION_V0 = new Schema(
-            TOPIC_NAME,
-            PARTITION_ID);
-
-    private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V0 = new Schema(
-            ERROR_CODE,
-            new Field(PARTITIONS_REMAINING_KEY_NAME, new 
ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V0), "The partitions " +
-                    "that the broker still leads."));
-
-    private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = 
CONTROLLED_SHUTDOWN_RESPONSE_V0;
-    private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V2 = 
CONTROLLED_SHUTDOWN_RESPONSE_V1;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{CONTROLLED_SHUTDOWN_RESPONSE_V0, 
CONTROLLED_SHUTDOWN_RESPONSE_V1, CONTROLLED_SHUTDOWN_RESPONSE_V2};
-    }
-
     /**
      * Possible error codes:
      *
@@ -62,58 +39,49 @@ public class ControlledShutdownResponse extends 
AbstractResponse {
      * BROKER_NOT_AVAILABLE(8)
      * STALE_CONTROLLER_EPOCH(11)
      */
-    private final Errors error;
+    private final ControlledShutdownResponseData data;
 
-    private final Set<TopicPartition> partitionsRemaining;
-
-    public ControlledShutdownResponse(Errors error, Set<TopicPartition> 
partitionsRemaining) {
-        this.error = error;
-        this.partitionsRemaining = partitionsRemaining;
+    public ControlledShutdownResponse(ControlledShutdownResponseData data) {
+        this.data = data;
     }
 
-    public ControlledShutdownResponse(Struct struct) {
-        error = Errors.forCode(struct.get(ERROR_CODE));
-        Set<TopicPartition> partitions = new HashSet<>();
-        for (Object topicPartitionObj : 
struct.getArray(PARTITIONS_REMAINING_KEY_NAME)) {
-            Struct topicPartition = (Struct) topicPartitionObj;
-            String topic = topicPartition.get(TOPIC_NAME);
-            int partition = topicPartition.get(PARTITION_ID);
-            partitions.add(new TopicPartition(topic, partition));
-        }
-        partitionsRemaining = partitions;
+    public ControlledShutdownResponse(Struct struct, short version) {
+        this.data = new ControlledShutdownResponseData(struct, version);
     }
 
     public Errors error() {
-        return error;
+        return Errors.forCode(data.errorCode());
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return errorCounts(error);
-    }
-
-    public Set<TopicPartition> partitionsRemaining() {
-        return partitionsRemaining;
+        return Collections.singletonMap(error(), 1);
     }
 
     public static ControlledShutdownResponse parse(ByteBuffer buffer, short 
version) {
-        return new 
ControlledShutdownResponse(ApiKeys.CONTROLLED_SHUTDOWN.parseResponse(version, 
buffer));
+        return new 
ControlledShutdownResponse(ApiKeys.CONTROLLED_SHUTDOWN.parseResponse(version, 
buffer), version);
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new 
Struct(ApiKeys.CONTROLLED_SHUTDOWN.responseSchema(version));
-        struct.set(ERROR_CODE, error.code());
+        return data.toStruct(version);
+    }
 
-        List<Struct> partitionsRemainingList = new 
ArrayList<>(partitionsRemaining.size());
-        for (TopicPartition topicPartition : partitionsRemaining) {
-            Struct topicPartitionStruct = 
struct.instance(PARTITIONS_REMAINING_KEY_NAME);
-            topicPartitionStruct.set(TOPIC_NAME, topicPartition.topic());
-            topicPartitionStruct.set(PARTITION_ID, topicPartition.partition());
-            partitionsRemainingList.add(topicPartitionStruct);
-        }
-        struct.set(PARTITIONS_REMAINING_KEY_NAME, 
partitionsRemainingList.toArray());
+    public ControlledShutdownResponseData data() {
+        return data;
+    }
 
-        return struct;
+    public static ControlledShutdownResponse prepareResponse(Errors error, 
Set<TopicPartition> tps) {
+        ControlledShutdownResponseData data = new 
ControlledShutdownResponseData();
+        data.setErrorCode(error.code());
+        ControlledShutdownResponseData.RemainingPartitionSet pSet = new 
RemainingPartitionSet();
+        tps.forEach(tp -> {
+            pSet.add(new RemainingPartition()
+                    .setTopicName(tp.topic())
+                    .setPartitionIndex(tp.partition()));
+        });
+        data.setRemainingPartitions(pSet);
+        return new ControlledShutdownResponse(data);
     }
+
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 5b92b1b..dfdc323 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -32,6 +32,10 @@ import 
org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ControlledShutdownRequestData;
+import org.apache.kafka.common.message.ControlledShutdownResponseData;
+import 
org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
+import 
org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionSet;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
@@ -606,7 +610,7 @@ public class RequestResponseTest {
         ByteBuffer buffer = toBuffer(struct);
         ControlledShutdownResponse deserialized = 
ControlledShutdownResponse.parse(buffer, version);
         assertEquals(response.error(), deserialized.error());
-        assertEquals(response.partitionsRemaining(), 
deserialized.partitionsRemaining());
+        assertEquals(response.data().remainingPartitions(), 
deserialized.data().remainingPartitions());
     }
 
     @Test(expected = UnsupportedVersionException.class)
@@ -975,19 +979,37 @@ public class RequestResponseTest {
     }
 
     private ControlledShutdownRequest createControlledShutdownRequest() {
-        return new ControlledShutdownRequest.Builder(10, 0, 
ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()).build();
+        ControlledShutdownRequestData data = new 
ControlledShutdownRequestData()
+                .setBrokerId(10)
+                .setBrokerEpoch(0L);
+        return new ControlledShutdownRequest.Builder(
+                data,
+                ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()).build();
     }
 
     private ControlledShutdownRequest createControlledShutdownRequest(int 
version) {
-        return new ControlledShutdownRequest.Builder(10, 0, 
ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()).build((short) version);
+        ControlledShutdownRequestData data = new 
ControlledShutdownRequestData()
+                .setBrokerId(10)
+                .setBrokerEpoch(0L);
+        return new ControlledShutdownRequest.Builder(
+                data,
+                ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()).build((short) 
version);
     }
 
     private ControlledShutdownResponse createControlledShutdownResponse() {
-        Set<TopicPartition> topicPartitions = Utils.mkSet(
-                new TopicPartition("test2", 5),
-                new TopicPartition("test1", 10)
-        );
-        return new ControlledShutdownResponse(Errors.NONE, topicPartitions);
+        RemainingPartition p1 = new RemainingPartition()
+                .setTopicName("test2")
+                .setPartitionIndex(5);
+        RemainingPartition p2 = new RemainingPartition()
+                .setTopicName("test1")
+                .setPartitionIndex(10);
+        RemainingPartitionSet pSet = new RemainingPartitionSet();
+        pSet.add(p1);
+        pSet.add(p2);
+        ControlledShutdownResponseData data = new 
ControlledShutdownResponseData()
+                .setErrorCode(Errors.NONE.code())
+                .setRemainingPartitions(pSet);
+        return new ControlledShutdownResponse(data);
     }
 
     private LeaderAndIsrRequest createLeaderAndIsrRequest(int version) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index fc06162..7d12fe3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -284,14 +284,14 @@ class KafkaApis(val requestChannel: RequestChannel,
     def controlledShutdownCallback(controlledShutdownResult: 
Try[Set[TopicPartition]]): Unit = {
       val response = controlledShutdownResult match {
         case Success(partitionsRemaining) =>
-          new ControlledShutdownResponse(Errors.NONE, 
partitionsRemaining.asJava)
+         ControlledShutdownResponse.prepareResponse(Errors.NONE, 
partitionsRemaining.asJava)
 
         case Failure(throwable) =>
           controlledShutdownRequest.getErrorResponse(throwable)
       }
       sendResponseExemptThrottle(request, response)
     }
-    controller.controlledShutdown(controlledShutdownRequest.brokerId, 
controlledShutdownRequest.brokerEpoch, controlledShutdownCallback)
+    controller.controlledShutdown(controlledShutdownRequest.data.brokerId, 
controlledShutdownRequest.data.brokerEpoch, controlledShutdownCallback)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index b5ee8cc..2db3839 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -39,6 +39,7 @@ import kafka.utils._
 import kafka.zk.{BrokerInfo, KafkaZkClient}
 import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, 
ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
 import org.apache.kafka.common.internals.ClusterResourceListeners
+import org.apache.kafka.common.message.ControlledShutdownRequestData
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.Errors
@@ -516,19 +517,22 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
                 else if (config.interBrokerProtocolVersion < KAFKA_2_2_IV0) 1
                 else 2
 
-              val controlledShutdownRequest = new 
ControlledShutdownRequest.Builder(config.brokerId,
-                kafkaController.brokerEpoch, controlledShutdownApiVersion)
+              val controlledShutdownRequest = new 
ControlledShutdownRequest.Builder(
+                  new ControlledShutdownRequestData()
+                    .setBrokerId(config.brokerId)
+                    .setBrokerEpoch(kafkaController.brokerEpoch),
+                    controlledShutdownApiVersion)
               val request = 
networkClient.newClientRequest(node(prevController).idString, 
controlledShutdownRequest,
                 time.milliseconds(), true)
               val clientResponse = 
NetworkClientUtils.sendAndReceive(networkClient, request, time)
 
               val shutdownResponse = 
clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse]
-              if (shutdownResponse.error == Errors.NONE && 
shutdownResponse.partitionsRemaining.isEmpty) {
+              if (shutdownResponse.error == Errors.NONE && 
shutdownResponse.data.remainingPartitions.isEmpty) {
                 shutdownSucceeded = true
                 info("Controlled shutdown succeeded")
               }
               else {
-                info(s"Remaining partitions to move: 
${shutdownResponse.partitionsRemaining.asScala.mkString(",")}")
+                info(s"Remaining partitions to move: 
${shutdownResponse.data.remainingPartitions}")
                 info(s"Error from controller: ${shutdownResponse.error}")
               }
             }
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 11ee87c..a7cb654 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
-import org.apache.kafka.common.message.{CreateTopicsRequestData, 
DeleteTopicsRequestData, DescribeGroupsRequestData, JoinGroupRequestData, 
LeaveGroupRequestData}
+import org.apache.kafka.common.message.{ControlledShutdownRequestData, 
CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, 
JoinGroupRequestData, LeaveGroupRequestData}
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicSet}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -362,8 +362,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def stopReplicaRequest = new 
StopReplicaRequest.Builder(ApiKeys.STOP_REPLICA.latestVersion, brokerId, 
Int.MaxValue, Long.MaxValue, true, Set(tp).asJava).build()
 
-  private def controlledShutdownRequest = new 
requests.ControlledShutdownRequest.Builder(brokerId, Long.MaxValue,
-    ApiKeys.CONTROLLED_SHUTDOWN.latestVersion).build()
+  private def controlledShutdownRequest = new 
ControlledShutdownRequest.Builder(
+      new ControlledShutdownRequestData()
+        .setBrokerId(brokerId)
+        .setBrokerEpoch(Long.MaxValue),
+      ApiKeys.CONTROLLED_SHUTDOWN.latestVersion).build()
 
   private def createTopicsRequest =
     new CreateTopicsRequest.Builder(new CreateTopicsRequestData().setTopics(
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 28f7e07..671f9e0 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.message.{CreateTopicsRequestData, 
DeleteTopicsRequestData, DescribeGroupsRequestData, 
ElectPreferredLeadersRequestData, LeaveGroupRequestData, JoinGroupRequestData}
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.message.ControlledShutdownRequestData
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicSet}
 import org.apache.kafka.common.message.SaslAuthenticateRequestData
 import org.apache.kafka.common.message.SaslHandshakeRequestData
@@ -244,7 +245,11 @@ class RequestQuotaTest extends BaseRequestTest {
           new 
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, brokerId, 
Int.MaxValue, Long.MaxValue, partitionState, brokers)
 
         case ApiKeys.CONTROLLED_SHUTDOWN =>
-          new ControlledShutdownRequest.Builder(brokerId, Long.MaxValue, 
ApiKeys.CONTROLLED_SHUTDOWN.latestVersion)
+          new ControlledShutdownRequest.Builder(
+              new ControlledShutdownRequestData()
+                .setBrokerId(brokerId)
+                .setBrokerEpoch(Long.MaxValue),
+              ApiKeys.CONTROLLED_SHUTDOWN.latestVersion)
 
         case ApiKeys.OFFSET_COMMIT =>
           new OffsetCommitRequest.Builder("test-group",

Reply via email to