This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 855f899 KAFKA-8256; Replace Heartbeat request/response with automated protocol (#6691) 855f899 is described below commit 855f899bb523f3b334f711926a7db4cc75ebb4b4 Author: Mickael Maison <mimai...@users.noreply.github.com> AuthorDate: Thu May 16 21:08:49 2019 +0100 KAFKA-8256; Replace Heartbeat request/response with automated protocol (#6691) Reviewers: Boyang Chen <bche...@outlook.com>, Jason Gustafson <ja...@confluent.io> --- .../consumer/internals/AbstractCoordinator.java | 6 +- .../org/apache/kafka/common/protocol/ApiKeys.java | 6 +- .../kafka/common/requests/AbstractResponse.java | 2 +- .../kafka/common/requests/HeartbeatRequest.java | 82 +++++----------------- .../kafka/common/requests/HeartbeatResponse.java | 51 ++++---------- .../kafka/clients/consumer/KafkaConsumerTest.java | 7 +- .../internals/AbstractCoordinatorTest.java | 3 +- .../internals/ConsumerCoordinatorTest.java | 3 +- .../internals/ConsumerNetworkClientTest.java | 9 ++- .../kafka/common/requests/RequestResponseTest.java | 9 ++- core/src/main/scala/kafka/server/KafkaApis.scala | 18 +++-- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 4 +- 13 files changed, 76 insertions(+), 126 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 63a7b7c..a1e15f8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.LeaveGroupRequestData; @@ -891,7 +892,10 @@ public abstract class AbstractCoordinator implements Closeable { synchronized RequestFuture<Void> sendHeartbeatRequest() { log.debug("Sending Heartbeat request to coordinator {}", coordinator); HeartbeatRequest.Builder requestBuilder = - new HeartbeatRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId); + new HeartbeatRequest.Builder(new HeartbeatRequestData() + .setGroupId(groupId) + .setGenerationid(this.generation.generationId) + .setMemberId(this.generation.memberId)); return client.send(coordinator, requestBuilder) .compose(new HeartbeatResponseHandler()); } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index a7e3757..6a16578 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -28,6 +28,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.FindCoordinatorResponseData; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; import org.apache.kafka.common.message.InitProducerIdRequestData; @@ -87,8 +89,6 @@ import org.apache.kafka.common.requests.ExpireDelegationTokenRequest; import org.apache.kafka.common.requests.ExpireDelegationTokenResponse; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; -import org.apache.kafka.common.requests.HeartbeatRequest; -import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.LeaderAndIsrRequest; import org.apache.kafka.common.requests.LeaderAndIsrResponse; import org.apache.kafka.common.requests.ListGroupsRequest; @@ -138,7 +138,7 @@ public enum ApiKeys { FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequestData.SCHEMAS, FindCoordinatorResponseData.SCHEMAS), JOIN_GROUP(11, "JoinGroup", JoinGroupRequestData.SCHEMAS, JoinGroupResponseData.SCHEMAS), - HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()), + HEARTBEAT(12, "Heartbeat", HeartbeatRequestData.SCHEMAS, HeartbeatResponseData.SCHEMAS), LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS), SYNC_GROUP(14, "SyncGroup", SyncGroupRequestData.SCHEMAS, SyncGroupResponseData.SCHEMAS), DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequestData.SCHEMAS, diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index a7f5a38..32402e4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -87,7 +87,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { case JOIN_GROUP: return new JoinGroupResponse(struct, version); case HEARTBEAT: - return new HeartbeatResponse(struct); + return new HeartbeatResponse(struct, version); case LEAVE_GROUP: return new LeaveGroupResponse(struct, version); case SYNC_GROUP: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java index 9a13147..f78cafe 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java @@ -16,120 +16,72 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID; -import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID; -import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID; public class HeartbeatRequest extends AbstractRequest { - private static final Schema HEARTBEAT_REQUEST_V0 = new Schema( - GROUP_ID, - GENERATION_ID, - MEMBER_ID); - - /* v1 request is the same as v0. Throttle time has been added to response */ - private static final Schema HEARTBEAT_REQUEST_V1 = HEARTBEAT_REQUEST_V0; - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema HEARTBEAT_REQUEST_V2 = HEARTBEAT_REQUEST_V1; - - public static Schema[] schemaVersions() { - return new Schema[] {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1, - HEARTBEAT_REQUEST_V2}; - } public static class Builder extends AbstractRequest.Builder<HeartbeatRequest> { - private final String groupId; - private final int groupGenerationId; - private final String memberId; + private final HeartbeatRequestData data; - public Builder(String groupId, int groupGenerationId, String memberId) { + public Builder(HeartbeatRequestData data) { super(ApiKeys.HEARTBEAT); - this.groupId = groupId; - this.groupGenerationId = groupGenerationId; - this.memberId = memberId; + this.data = data; } @Override public HeartbeatRequest build(short version) { - return new HeartbeatRequest(groupId, groupGenerationId, memberId, version); + return new HeartbeatRequest(data, version); } @Override public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(type=HeartbeatRequest"). - append(", groupId=").append(groupId). - append(", groupGenerationId=").append(groupGenerationId). - append(", memberId=").append(memberId). - append(")"); - return bld.toString(); + return data.toString(); } } - private final String groupId; - private final int groupGenerationId; - private final String memberId; + public final HeartbeatRequestData data; - private HeartbeatRequest(String groupId, int groupGenerationId, String memberId, short version) { + private HeartbeatRequest(HeartbeatRequestData data, short version) { super(ApiKeys.HEARTBEAT, version); - this.groupId = groupId; - this.groupGenerationId = groupGenerationId; - this.memberId = memberId; + this.data = data; } public HeartbeatRequest(Struct struct, short version) { super(ApiKeys.HEARTBEAT, version); - groupId = struct.get(GROUP_ID); - groupGenerationId = struct.get(GENERATION_ID); - memberId = struct.get(MEMBER_ID); + this.data = new HeartbeatRequestData(struct, version); } @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + HeartbeatResponseData response = new HeartbeatResponseData(); + response.setErrorCode(Errors.forException(e).code()); short versionId = version(); switch (versionId) { case 0: - return new HeartbeatResponse(Errors.forException(e)); + return new HeartbeatResponse(response); case 1: case 2: - return new HeartbeatResponse(throttleTimeMs, Errors.forException(e)); + response.setThrottleTimeMs(throttleTimeMs); + return new HeartbeatResponse(response); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ApiKeys.HEARTBEAT.latestVersion())); } } - public String groupId() { - return groupId; - } - - public int groupGenerationId() { - return groupGenerationId; - } - - public String memberId() { - return memberId; - } - public static HeartbeatRequest parse(ByteBuffer buffer, short version) { return new HeartbeatRequest(ApiKeys.HEARTBEAT.parseRequest(version, buffer), version); } @Override protected Struct toStruct() { - Struct struct = new Struct(ApiKeys.HEARTBEAT.requestSchema(version())); - struct.set(GROUP_ID, groupId); - struct.set(GENERATION_ID, groupGenerationId); - struct.set(MEMBER_ID, memberId); - return struct; + return data.toStruct(version()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java index efe2ed8..cc36a20 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java @@ -16,35 +16,18 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.message.HeartbeatResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; public class HeartbeatResponse extends AbstractResponse { - private static final Schema HEARTBEAT_RESPONSE_V0 = new Schema( - ERROR_CODE); - private static final Schema HEARTBEAT_RESPONSE_V1 = new Schema( - THROTTLE_TIME_MS, - ERROR_CODE); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema HEARTBEAT_RESPONSE_V2 = HEARTBEAT_RESPONSE_V1; - - public static Schema[] schemaVersions() { - return new Schema[] {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1, - HEARTBEAT_RESPONSE_V2}; - } - /** * Possible error codes: * @@ -55,47 +38,37 @@ public class HeartbeatResponse extends AbstractResponse { * REBALANCE_IN_PROGRESS (27) * GROUP_AUTHORIZATION_FAILED (30) */ - private final Errors error; - private final int throttleTimeMs; - - public HeartbeatResponse(Errors error) { - this(DEFAULT_THROTTLE_TIME, error); - } + private final HeartbeatResponseData data; - public HeartbeatResponse(int throttleTimeMs, Errors error) { - this.throttleTimeMs = throttleTimeMs; - this.error = error; + public HeartbeatResponse(HeartbeatResponseData data) { + this.data = data; } - public HeartbeatResponse(Struct struct) { - this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); - error = Errors.forCode(struct.get(ERROR_CODE)); + public HeartbeatResponse(Struct struct, short version) { + this.data = new HeartbeatResponseData(struct, version); } @Override public int throttleTimeMs() { - return throttleTimeMs; + return data.throttleTimeMs(); } public Errors error() { - return error; + return Errors.forCode(data.errorCode()); } @Override public Map<Errors, Integer> errorCounts() { - return errorCounts(error); + return Collections.singletonMap(error(), 1); } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.HEARTBEAT.responseSchema(version)); - struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); - struct.set(ERROR_CODE, error.code()); - return struct; + return data.toStruct(version); } public static HeartbeatResponse parse(ByteBuffer buffer, short version) { - return new HeartbeatResponse(ApiKeys.HEARTBEAT.parseResponse(version, buffer)); + return new HeartbeatResponse(ApiKeys.HEARTBEAT.parseResponse(version, buffer), version); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 09eb94a..606b711 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.message.HeartbeatResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData; @@ -1440,7 +1441,9 @@ public class KafkaConsumerTest { public boolean matches(AbstractRequest body) { return true; } - }, new HeartbeatResponse(Errors.REBALANCE_IN_PROGRESS), coordinator); + }, new HeartbeatResponse( + new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())), + coordinator); // join group final ByteBuffer byteBuffer = ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(singletonList(topic))); @@ -1713,7 +1716,7 @@ public class KafkaConsumerTest { heartbeatReceived.set(true); return true; } - }, new HeartbeatResponse(Errors.NONE), coordinator); + }, new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(Errors.NONE.code())), coordinator); return heartbeatReceived; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 449c58f..c8be163 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.HeartbeatResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.SyncGroupResponseData; @@ -773,7 +774,7 @@ public class AbstractCoordinatorTest { } private HeartbeatResponse heartbeatResponse(Errors error) { - return new HeartbeatResponse(error); + return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(error.code())); } private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index d8b1252..958e440 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.HeartbeatResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData; @@ -2189,7 +2190,7 @@ public class ConsumerCoordinatorTest { } private HeartbeatResponse heartbeatResponse(Errors error) { - return new HeartbeatResponse(error); + return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(error.code())); } private JoinGroupResponse joinGroupLeaderResponse(int generationId, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index 1b7f8fb..f3750aa 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -30,6 +30,8 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.HeartbeatRequest; import org.apache.kafka.common.requests.HeartbeatResponse; @@ -378,11 +380,14 @@ public class ConsumerNetworkClientTest { } private HeartbeatRequest.Builder heartbeat() { - return new HeartbeatRequest.Builder("group", 1, "memberId"); + return new HeartbeatRequest.Builder(new HeartbeatRequestData() + .setGroupId("group") + .setGenerationid(1) + .setMemberId("memberId")); } private HeartbeatResponse heartbeatResponse(Errors error) { - return new HeartbeatResponse(error); + return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(error.code())); } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 43057a6..d9a9ed1 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -53,6 +53,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.message.HeartbeatRequestData; +import org.apache.kafka.common.message.HeartbeatResponseData; import org.apache.kafka.common.message.InitProducerIdRequestData; import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; @@ -772,11 +774,14 @@ public class RequestResponseTest { } private HeartbeatRequest createHeartBeatRequest() { - return new HeartbeatRequest.Builder("group1", 1, "consumer1").build(); + return new HeartbeatRequest.Builder(new HeartbeatRequestData() + .setGroupId("group1") + .setGenerationid(1) + .setMemberId("consumer1")).build(); } private HeartbeatResponse createHeartBeatResponse() { - return new HeartbeatResponse(Errors.NONE); + return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(Errors.NONE.code())); } private JoinGroupRequest createJoinGroupRequest(int version) { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6bd7174..ef921bf 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1424,7 +1424,10 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for sending a heartbeat response def sendResponseCallback(error: Errors) { def createResponse(requestThrottleMs: Int): AbstractResponse = { - val response = new HeartbeatResponse(requestThrottleMs, error) + val response = new HeartbeatResponse( + new HeartbeatResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setErrorCode(error.code)) trace("Sending heartbeat response %s for correlation id %d to client %s." .format(response, request.header.correlationId, request.header.clientId)) response @@ -1432,15 +1435,18 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseMaybeThrottle(request, createResponse) } - if (!authorize(request.session, Read, Resource(Group, heartbeatRequest.groupId, LITERAL))) { + if (!authorize(request.session, Read, Resource(Group, heartbeatRequest.data.groupId, LITERAL))) { sendResponseMaybeThrottle(request, requestThrottleMs => - new HeartbeatResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)) + new HeartbeatResponse( + new HeartbeatResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))) } else { // let the coordinator to handle heartbeat groupCoordinator.handleHeartbeat( - heartbeatRequest.groupId, - heartbeatRequest.memberId, - heartbeatRequest.groupGenerationId, + heartbeatRequest.data.groupId, + heartbeatRequest.data.memberId, + heartbeatRequest.data.generationid, sendResponseCallback) } } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index aa6a0dd..fa5e91a 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -380,7 +380,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ).build() } - private def heartbeatRequest = new HeartbeatRequest.Builder(group, 1, "").build() + private def heartbeatRequest = new HeartbeatRequest.Builder(new HeartbeatRequestData().setGroupId(group).setGenerationid(1).setMemberId("")).build() private def leaveGroupRequest = new LeaveGroupRequest.Builder(new LeaveGroupRequestData().setGroupId(group).setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)).build() diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index eadcffb..782f873 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -300,7 +300,7 @@ class RequestQuotaTest extends BaseRequestTest { ) case ApiKeys.HEARTBEAT => - new HeartbeatRequest.Builder("test-group", 1, "") + new HeartbeatRequest.Builder(new HeartbeatRequestData().setGroupId("test-group").setGenerationid(1).setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)) case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest.Builder(new LeaveGroupRequestData().setGroupId("test-leave-group").setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)) @@ -503,7 +503,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.FIND_COORDINATOR => new FindCoordinatorResponse(response, ApiKeys.FIND_COORDINATOR.latestVersion).throttleTimeMs case ApiKeys.JOIN_GROUP => new JoinGroupResponse(response).throttleTimeMs - case ApiKeys.HEARTBEAT => new HeartbeatResponse(response).throttleTimeMs + case ApiKeys.HEARTBEAT => new HeartbeatResponse(response, ApiKeys.HEARTBEAT.latestVersion).throttleTimeMs case ApiKeys.LEAVE_GROUP => new LeaveGroupResponse(response).throttleTimeMs case ApiKeys.SYNC_GROUP => new SyncGroupResponse(response).throttleTimeMs case ApiKeys.DESCRIBE_GROUPS =>