This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c301025 KAFKA-8090: Use automatic RPC generation in ControlledShutdown c301025 is described below commit c3010254843dcdf0bd68602f752f690cd76d9813 Author: Mickael Maison <mickael.mai...@gmail.com> AuthorDate: Thu Apr 4 22:05:04 2019 +0530 KAFKA-8090: Use automatic RPC generation in ControlledShutdown Author: Mickael Maison <mickael.mai...@gmail.com> Reviewers: Manikumar Reddy <manikumar.re...@gmail.com> Closes #6423 from mimaison/controlled-shutdown --- .../org/apache/kafka/common/protocol/ApiKeys.java | 8 +- .../kafka/common/requests/AbstractResponse.java | 2 +- .../common/requests/ControlledShutdownRequest.java | 76 ++++++------------ .../requests/ControlledShutdownResponse.java | 90 +++++++--------------- .../kafka/common/requests/RequestResponseTest.java | 38 +++++++-- core/src/main/scala/kafka/server/KafkaApis.scala | 4 +- core/src/main/scala/kafka/server/KafkaServer.scala | 12 ++- .../kafka/api/AuthorizerIntegrationTest.scala | 9 ++- .../scala/unit/kafka/server/RequestQuotaTest.scala | 7 +- 9 files changed, 110 insertions(+), 136 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index ed9787f..f49c99a 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.protocol; +import org.apache.kafka.common.message.ControlledShutdownRequestData; +import org.apache.kafka.common.message.ControlledShutdownResponseData; import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsResponseData; import org.apache.kafka.common.message.DeleteTopicsRequestData; @@ -49,8 +51,6 @@ import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest; import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse; import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; -import org.apache.kafka.common.requests.ControlledShutdownRequest; -import org.apache.kafka.common.requests.ControlledShutdownResponse; import org.apache.kafka.common.requests.CreateAclsRequest; import org.apache.kafka.common.requests.CreateAclsResponse; import org.apache.kafka.common.requests.CreateDelegationTokenRequest; @@ -129,8 +129,8 @@ public enum ApiKeys { STOP_REPLICA(5, "StopReplica", true, StopReplicaRequest.schemaVersions(), StopReplicaResponse.schemaVersions()), UPDATE_METADATA(6, "UpdateMetadata", true, UpdateMetadataRequest.schemaVersions(), UpdateMetadataResponse.schemaVersions()), - CONTROLLED_SHUTDOWN(7, "ControlledShutdown", true, ControlledShutdownRequest.schemaVersions(), - ControlledShutdownResponse.schemaVersions()), + CONTROLLED_SHUTDOWN(7, "ControlledShutdown", true, ControlledShutdownRequestData.SCHEMAS, + ControlledShutdownResponseData.SCHEMAS), OFFSET_COMMIT(8, "OffsetCommit", OffsetCommitRequest.schemaVersions(), OffsetCommitResponse.schemaVersions()), OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequest.schemaVersions(), OffsetFetchResponse.schemaVersions()), FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequest.schemaVersions(), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index f74e1ae..f594f20 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -95,7 +95,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { case STOP_REPLICA: return new StopReplicaResponse(struct); case CONTROLLED_SHUTDOWN: - return new ControlledShutdownResponse(struct); + return new ControlledShutdownResponse(struct, version); case UPDATE_METADATA: return new UpdateMetadataResponse(struct); case LEADER_AND_ISR: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java index ea8ff5f..f89316a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java @@ -16,97 +16,68 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.message.ControlledShutdownRequestData; +import org.apache.kafka.common.message.ControlledShutdownResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -import java.util.Collections; -import static org.apache.kafka.common.protocol.types.Type.INT32; -import static org.apache.kafka.common.protocol.types.Type.INT64; public class ControlledShutdownRequest extends AbstractRequest { - private static final String BROKER_ID_KEY_NAME = "broker_id"; - private static final String BROKER_EPOCH_KEY_NAME = "broker_epoch"; - - private static final Schema CONTROLLED_SHUTDOWN_REQUEST_V0 = new Schema( - new Field(BROKER_ID_KEY_NAME, INT32, "The id of the broker for which controlled shutdown has been requested.")); - private static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = CONTROLLED_SHUTDOWN_REQUEST_V0; - // Introduce broker_epoch to allow controller to reject stale ControlledShutdownRequest - private static final Schema CONTROLLED_SHUTDOWN_REQUEST_V2 = new Schema( - new Field(BROKER_ID_KEY_NAME, INT32, "The id of the broker for which controlled shutdown has been requested."), - new Field(BROKER_EPOCH_KEY_NAME, INT64, "The broker epoch")); - - public static Schema[] schemaVersions() { - return new Schema[] {CONTROLLED_SHUTDOWN_REQUEST_V0, CONTROLLED_SHUTDOWN_REQUEST_V1, CONTROLLED_SHUTDOWN_REQUEST_V2}; - } public static class Builder extends AbstractRequest.Builder<ControlledShutdownRequest> { - private final int brokerId; - private final long brokerEpoch; - public Builder(int brokerId, long brokerEpoch, short desiredVersion) { + private final ControlledShutdownRequestData data; + + public Builder(ControlledShutdownRequestData data, short desiredVersion) { super(ApiKeys.CONTROLLED_SHUTDOWN, desiredVersion); - this.brokerId = brokerId; - this.brokerEpoch = brokerEpoch; + this.data = data; } @Override public ControlledShutdownRequest build(short version) { - return new ControlledShutdownRequest(brokerId, brokerEpoch, version); + return new ControlledShutdownRequest(data, version); } @Override public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(type=ControlledShutdownRequest"). - append(", brokerId=").append(brokerId). - append(", brokerEpoch=").append(brokerEpoch). - append(")"); - return bld.toString(); + return data.toString(); } } - private final int brokerId; - private final long brokerEpoch; - private ControlledShutdownRequest(int brokerId, long brokerEpoch, short version) { + private final ControlledShutdownRequestData data; + private final short version; + + private ControlledShutdownRequest(ControlledShutdownRequestData data, short version) { super(ApiKeys.CONTROLLED_SHUTDOWN, version); - this.brokerId = brokerId; - this.brokerEpoch = brokerEpoch; + this.data = data; + this.version = version; } public ControlledShutdownRequest(Struct struct, short version) { super(ApiKeys.CONTROLLED_SHUTDOWN, version); - brokerId = struct.getInt(BROKER_ID_KEY_NAME); - brokerEpoch = struct.hasField(BROKER_EPOCH_KEY_NAME) ? struct.getLong(BROKER_EPOCH_KEY_NAME) : - AbstractControlRequest.UNKNOWN_BROKER_EPOCH; + this.data = new ControlledShutdownRequestData(struct, version); + this.version = version; } @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + ControlledShutdownResponseData response = new ControlledShutdownResponseData(); + response.setErrorCode(Errors.forException(e).code()); short versionId = version(); switch (versionId) { case 0: case 1: case 2: - return new ControlledShutdownResponse(Errors.forException(e), Collections.emptySet()); + return new ControlledShutdownResponse(response); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ApiKeys.CONTROLLED_SHUTDOWN.latestVersion())); } } - public int brokerId() { - return brokerId; - } - - public long brokerEpoch() { - return brokerEpoch; - } - public static ControlledShutdownRequest parse(ByteBuffer buffer, short version) { return new ControlledShutdownRequest( ApiKeys.CONTROLLED_SHUTDOWN.parseRequest(version, buffer), version); @@ -114,9 +85,10 @@ public class ControlledShutdownRequest extends AbstractRequest { @Override protected Struct toStruct() { - Struct struct = new Struct(ApiKeys.CONTROLLED_SHUTDOWN.requestSchema(version())); - struct.set(BROKER_ID_KEY_NAME, brokerId); - struct.setIfExists(BROKER_EPOCH_KEY_NAME, brokerEpoch); - return struct; + return data.toStruct(version); + } + + public ControlledShutdownRequestData data() { + return data; } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java index 1d62b2d..5448f03 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java @@ -17,44 +17,21 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ControlledShutdownResponseData; +import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition; +import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionSet; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; +import java.util.Collections; import java.util.Map; import java.util.Set; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; public class ControlledShutdownResponse extends AbstractResponse { - private static final String PARTITIONS_REMAINING_KEY_NAME = "partitions_remaining"; - - private static final Schema CONTROLLED_SHUTDOWN_PARTITION_V0 = new Schema( - TOPIC_NAME, - PARTITION_ID); - - private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V0 = new Schema( - ERROR_CODE, - new Field(PARTITIONS_REMAINING_KEY_NAME, new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V0), "The partitions " + - "that the broker still leads.")); - - private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = CONTROLLED_SHUTDOWN_RESPONSE_V0; - private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V2 = CONTROLLED_SHUTDOWN_RESPONSE_V1; - - public static Schema[] schemaVersions() { - return new Schema[]{CONTROLLED_SHUTDOWN_RESPONSE_V0, CONTROLLED_SHUTDOWN_RESPONSE_V1, CONTROLLED_SHUTDOWN_RESPONSE_V2}; - } - /** * Possible error codes: * @@ -62,58 +39,49 @@ public class ControlledShutdownResponse extends AbstractResponse { * BROKER_NOT_AVAILABLE(8) * STALE_CONTROLLER_EPOCH(11) */ - private final Errors error; + private final ControlledShutdownResponseData data; - private final Set<TopicPartition> partitionsRemaining; - - public ControlledShutdownResponse(Errors error, Set<TopicPartition> partitionsRemaining) { - this.error = error; - this.partitionsRemaining = partitionsRemaining; + public ControlledShutdownResponse(ControlledShutdownResponseData data) { + this.data = data; } - public ControlledShutdownResponse(Struct struct) { - error = Errors.forCode(struct.get(ERROR_CODE)); - Set<TopicPartition> partitions = new HashSet<>(); - for (Object topicPartitionObj : struct.getArray(PARTITIONS_REMAINING_KEY_NAME)) { - Struct topicPartition = (Struct) topicPartitionObj; - String topic = topicPartition.get(TOPIC_NAME); - int partition = topicPartition.get(PARTITION_ID); - partitions.add(new TopicPartition(topic, partition)); - } - partitionsRemaining = partitions; + public ControlledShutdownResponse(Struct struct, short version) { + this.data = new ControlledShutdownResponseData(struct, version); } public Errors error() { - return error; + return Errors.forCode(data.errorCode()); } @Override public Map<Errors, Integer> errorCounts() { - return errorCounts(error); - } - - public Set<TopicPartition> partitionsRemaining() { - return partitionsRemaining; + return Collections.singletonMap(error(), 1); } public static ControlledShutdownResponse parse(ByteBuffer buffer, short version) { - return new ControlledShutdownResponse(ApiKeys.CONTROLLED_SHUTDOWN.parseResponse(version, buffer)); + return new ControlledShutdownResponse(ApiKeys.CONTROLLED_SHUTDOWN.parseResponse(version, buffer), version); } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.CONTROLLED_SHUTDOWN.responseSchema(version)); - struct.set(ERROR_CODE, error.code()); + return data.toStruct(version); + } - List<Struct> partitionsRemainingList = new ArrayList<>(partitionsRemaining.size()); - for (TopicPartition topicPartition : partitionsRemaining) { - Struct topicPartitionStruct = struct.instance(PARTITIONS_REMAINING_KEY_NAME); - topicPartitionStruct.set(TOPIC_NAME, topicPartition.topic()); - topicPartitionStruct.set(PARTITION_ID, topicPartition.partition()); - partitionsRemainingList.add(topicPartitionStruct); - } - struct.set(PARTITIONS_REMAINING_KEY_NAME, partitionsRemainingList.toArray()); + public ControlledShutdownResponseData data() { + return data; + } - return struct; + public static ControlledShutdownResponse prepareResponse(Errors error, Set<TopicPartition> tps) { + ControlledShutdownResponseData data = new ControlledShutdownResponseData(); + data.setErrorCode(error.code()); + ControlledShutdownResponseData.RemainingPartitionSet pSet = new RemainingPartitionSet(); + tps.forEach(tp -> { + pSet.add(new RemainingPartition() + .setTopicName(tp.topic()) + .setPartitionIndex(tp.partition())); + }); + data.setRemainingPartitions(pSet); + return new ControlledShutdownResponse(data); } + } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 5b92b1b..dfdc323 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -32,6 +32,10 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.ControlledShutdownRequestData; +import org.apache.kafka.common.message.ControlledShutdownResponseData; +import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition; +import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionSet; import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; @@ -606,7 +610,7 @@ public class RequestResponseTest { ByteBuffer buffer = toBuffer(struct); ControlledShutdownResponse deserialized = ControlledShutdownResponse.parse(buffer, version); assertEquals(response.error(), deserialized.error()); - assertEquals(response.partitionsRemaining(), deserialized.partitionsRemaining()); + assertEquals(response.data().remainingPartitions(), deserialized.data().remainingPartitions()); } @Test(expected = UnsupportedVersionException.class) @@ -975,19 +979,37 @@ public class RequestResponseTest { } private ControlledShutdownRequest createControlledShutdownRequest() { - return new ControlledShutdownRequest.Builder(10, 0, ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()).build(); + ControlledShutdownRequestData data = new ControlledShutdownRequestData() + .setBrokerId(10) + .setBrokerEpoch(0L); + return new ControlledShutdownRequest.Builder( + data, + ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()).build(); } private ControlledShutdownRequest createControlledShutdownRequest(int version) { - return new ControlledShutdownRequest.Builder(10, 0, ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()).build((short) version); + ControlledShutdownRequestData data = new ControlledShutdownRequestData() + .setBrokerId(10) + .setBrokerEpoch(0L); + return new ControlledShutdownRequest.Builder( + data, + ApiKeys.CONTROLLED_SHUTDOWN.latestVersion()).build((short) version); } private ControlledShutdownResponse createControlledShutdownResponse() { - Set<TopicPartition> topicPartitions = Utils.mkSet( - new TopicPartition("test2", 5), - new TopicPartition("test1", 10) - ); - return new ControlledShutdownResponse(Errors.NONE, topicPartitions); + RemainingPartition p1 = new RemainingPartition() + .setTopicName("test2") + .setPartitionIndex(5); + RemainingPartition p2 = new RemainingPartition() + .setTopicName("test1") + .setPartitionIndex(10); + RemainingPartitionSet pSet = new RemainingPartitionSet(); + pSet.add(p1); + pSet.add(p2); + ControlledShutdownResponseData data = new ControlledShutdownResponseData() + .setErrorCode(Errors.NONE.code()) + .setRemainingPartitions(pSet); + return new ControlledShutdownResponse(data); } private LeaderAndIsrRequest createLeaderAndIsrRequest(int version) { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index fc06162..7d12fe3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -284,14 +284,14 @@ class KafkaApis(val requestChannel: RequestChannel, def controlledShutdownCallback(controlledShutdownResult: Try[Set[TopicPartition]]): Unit = { val response = controlledShutdownResult match { case Success(partitionsRemaining) => - new ControlledShutdownResponse(Errors.NONE, partitionsRemaining.asJava) + ControlledShutdownResponse.prepareResponse(Errors.NONE, partitionsRemaining.asJava) case Failure(throwable) => controlledShutdownRequest.getErrorResponse(throwable) } sendResponseExemptThrottle(request, response) } - controller.controlledShutdown(controlledShutdownRequest.brokerId, controlledShutdownRequest.brokerEpoch, controlledShutdownCallback) + controller.controlledShutdown(controlledShutdownRequest.data.brokerId, controlledShutdownRequest.data.brokerEpoch, controlledShutdownCallback) } /** diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b5ee8cc..2db3839 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -39,6 +39,7 @@ import kafka.utils._ import kafka.zk.{BrokerInfo, KafkaZkClient} import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils} import org.apache.kafka.common.internals.ClusterResourceListeners +import org.apache.kafka.common.message.ControlledShutdownRequestData import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _} import org.apache.kafka.common.network._ import org.apache.kafka.common.protocol.Errors @@ -516,19 +517,22 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP else if (config.interBrokerProtocolVersion < KAFKA_2_2_IV0) 1 else 2 - val controlledShutdownRequest = new ControlledShutdownRequest.Builder(config.brokerId, - kafkaController.brokerEpoch, controlledShutdownApiVersion) + val controlledShutdownRequest = new ControlledShutdownRequest.Builder( + new ControlledShutdownRequestData() + .setBrokerId(config.brokerId) + .setBrokerEpoch(kafkaController.brokerEpoch), + controlledShutdownApiVersion) val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest, time.milliseconds(), true) val clientResponse = NetworkClientUtils.sendAndReceive(networkClient, request, time) val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse] - if (shutdownResponse.error == Errors.NONE && shutdownResponse.partitionsRemaining.isEmpty) { + if (shutdownResponse.error == Errors.NONE && shutdownResponse.data.remainingPartitions.isEmpty) { shutdownSucceeded = true info("Controlled shutdown succeeded") } else { - info(s"Remaining partitions to move: ${shutdownResponse.partitionsRemaining.asScala.mkString(",")}") + info(s"Remaining partitions to move: ${shutdownResponse.data.remainingPartitions}") info(s"Error from controller: ${shutdownResponse.error}") } } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 11ee87c..a7cb654 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -33,7 +33,7 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME -import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, JoinGroupRequestData, LeaveGroupRequestData} +import org.apache.kafka.common.message.{ControlledShutdownRequestData, CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, JoinGroupRequestData, LeaveGroupRequestData} import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -362,8 +362,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def stopReplicaRequest = new StopReplicaRequest.Builder(ApiKeys.STOP_REPLICA.latestVersion, brokerId, Int.MaxValue, Long.MaxValue, true, Set(tp).asJava).build() - private def controlledShutdownRequest = new requests.ControlledShutdownRequest.Builder(brokerId, Long.MaxValue, - ApiKeys.CONTROLLED_SHUTDOWN.latestVersion).build() + private def controlledShutdownRequest = new ControlledShutdownRequest.Builder( + new ControlledShutdownRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(Long.MaxValue), + ApiKeys.CONTROLLED_SHUTDOWN.latestVersion).build() private def createTopicsRequest = new CreateTopicsRequest.Builder(new CreateTopicsRequestData().setTopics( diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 28f7e07..671f9e0 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -27,6 +27,7 @@ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData, JoinGroupRequestData} import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType} import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.message.ControlledShutdownRequestData import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet} import org.apache.kafka.common.message.SaslAuthenticateRequestData import org.apache.kafka.common.message.SaslHandshakeRequestData @@ -244,7 +245,11 @@ class RequestQuotaTest extends BaseRequestTest { new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, brokerId, Int.MaxValue, Long.MaxValue, partitionState, brokers) case ApiKeys.CONTROLLED_SHUTDOWN => - new ControlledShutdownRequest.Builder(brokerId, Long.MaxValue, ApiKeys.CONTROLLED_SHUTDOWN.latestVersion) + new ControlledShutdownRequest.Builder( + new ControlledShutdownRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(Long.MaxValue), + ApiKeys.CONTROLLED_SHUTDOWN.latestVersion) case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest.Builder("test-group",