Repository: kafka Updated Branches: refs/heads/1.0 8a175b0ee -> e409f847f
MINOR: Factor out some common group/transactional fields in request objects Author: Jason Gustafson <ja...@confluent.io> Reviewers: tedyu <yuzhih...@gmail.com>, Ismael Juma <ism...@juma.me.uk> Closes #4047 from hachikuji/factor-out-some-common-fields Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e409f847 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e409f847 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e409f847 Branch: refs/heads/1.0 Commit: e409f847f549f4fe10c552110366c646c1e44138 Parents: 8a175b0 Author: Jason Gustafson <ja...@confluent.io> Authored: Mon Oct 9 13:03:20 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Oct 10 12:49:21 2017 -0700 ---------------------------------------------------------------------- .../kafka/common/protocol/CommonFields.java | 13 +++++- .../kafka/common/protocol/types/Struct.java | 14 ++++++ .../common/requests/AddOffsetsToTxnRequest.java | 37 +++++++--------- .../requests/AddPartitionsToTxnRequest.java | 27 ++++++------ .../common/requests/DescribeGroupsResponse.java | 16 +++---- .../kafka/common/requests/EndTxnRequest.java | 27 ++++++------ .../kafka/common/requests/HeartbeatRequest.java | 28 ++++++------ .../common/requests/InitProducerIdResponse.java | 22 ++++------ .../kafka/common/requests/JoinGroupRequest.java | 20 ++++----- .../common/requests/JoinGroupResponse.java | 27 ++++++------ .../common/requests/LeaveGroupRequest.java | 19 ++++----- .../common/requests/ListGroupsResponse.java | 8 ++-- .../common/requests/OffsetCommitRequest.java | 45 ++++++++------------ .../common/requests/OffsetFetchRequest.java | 11 +++-- .../kafka/common/requests/SyncGroupRequest.java | 32 +++++++------- .../common/requests/TxnOffsetCommitRequest.java | 34 +++++++-------- 16 files changed, 182 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java index e12cde4..472a791 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java @@ -27,7 +27,18 @@ public class CommonFields { public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", "Response error code"); public static final Field.NullableStr ERROR_MESSAGE = new Field.NullableStr("error_message", "Response error message"); - // ACL Apis + // Group APIs + public static final Field.Str GROUP_ID = new Field.Str("group_id", "The unique group identifier"); + public static final Field.Int32 GENERATION_ID = new Field.Int32("generation_id", "The generation of the group."); + public static final Field.Str MEMBER_ID = new Field.Str("member_id", "The member id assigned by the group " + + "coordinator or null if joining for the first time."); + + // Transactional APIs + public static final Field.Str TRANSACTIONAL_ID = new Field.Str("transactional_id", "The transactional id corresponding to the transaction."); + public static final Field.Int64 PRODUCER_ID = new Field.Int64("producer_id", "Current producer id in use by the transactional id."); + public static final Field.Int16 PRODUCER_EPOCH = new Field.Int16("producer_epoch", "Current epoch associated with the producer id."); + + // ACL APIs public static final Field.Int8 RESOURCE_TYPE = new Field.Int8("resource_type", "The resource type"); public static final Field.Str RESOURCE_NAME = new Field.Str("resource_name", "The resource name"); public static final Field.NullableStr RESOURCE_NAME_FILTER = new Field.NullableStr("resource_name", "The resource name filter"); http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 1cbbcb3..b825201 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -83,6 +83,10 @@ public class Struct { return getInt(field.name); } + public Long get(Field.Int64 field) { + return getLong(field.name); + } + public Short get(Field.Int16 field) { return getShort(field.name); } @@ -113,6 +117,12 @@ public class Struct { return alternative; } + public String getOrElse(Field.Str field, String alternative) { + if (hasField(field.name)) + return getString(field.name); + return alternative; + } + /** * Get the record value for the field with the given name by doing a hash table lookup (slower!) * @@ -270,6 +280,10 @@ public class Struct { return set(def.name, value); } + public Struct set(Field.Int64 def, long value) { + return set(def.name, value); + } + public Struct set(Field.Int16 def, short value) { return set(def.name, value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java index e3e4d79..f6a1722 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java @@ -18,27 +18,22 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -import static org.apache.kafka.common.protocol.types.Type.INT16; -import static org.apache.kafka.common.protocol.types.Type.INT64; -import static org.apache.kafka.common.protocol.types.Type.STRING; +import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID; +import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH; +import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID; +import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID; public class AddOffsetsToTxnRequest extends AbstractRequest { - private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; - private static final String PRODUCER_ID_KEY_NAME = "producer_id"; - private static final String EPOCH_KEY_NAME = "producer_epoch"; - private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id"; - private static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V0 = new Schema( - new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id corresponding to the transaction."), - new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."), - new Field(EPOCH_KEY_NAME, INT16, "Current epoch associated with the producer id."), - new Field(CONSUMER_GROUP_ID_KEY_NAME, STRING, "Consumer group id whose offsets should be included in the transaction.")); + TRANSACTIONAL_ID, + PRODUCER_ID, + PRODUCER_EPOCH, + GROUP_ID); public static Schema[] schemaVersions() { return new Schema[]{ADD_OFFSETS_TO_TXN_REQUEST_V0}; @@ -95,10 +90,10 @@ public class AddOffsetsToTxnRequest extends AbstractRequest { public AddOffsetsToTxnRequest(Struct struct, short version) { super(version); - this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME); - this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); - this.producerEpoch = struct.getShort(EPOCH_KEY_NAME); - this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME); + this.transactionalId = struct.get(TRANSACTIONAL_ID); + this.producerId = struct.get(PRODUCER_ID); + this.producerEpoch = struct.get(PRODUCER_EPOCH); + this.consumerGroupId = struct.get(GROUP_ID); } public String transactionalId() { @@ -120,10 +115,10 @@ public class AddOffsetsToTxnRequest extends AbstractRequest { @Override protected Struct toStruct() { Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.requestSchema(version())); - struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId); - struct.set(PRODUCER_ID_KEY_NAME, producerId); - struct.set(EPOCH_KEY_NAME, producerEpoch); - struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId); + struct.set(TRANSACTIONAL_ID, transactionalId); + struct.set(PRODUCER_ID, producerId); + struct.set(PRODUCER_EPOCH, producerEpoch); + struct.set(GROUP_ID, consumerGroupId); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java index c195e24..0ca32be 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java @@ -31,23 +31,20 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH; +import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID; import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.INT16; +import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID; import static org.apache.kafka.common.protocol.types.Type.INT32; -import static org.apache.kafka.common.protocol.types.Type.INT64; -import static org.apache.kafka.common.protocol.types.Type.STRING; public class AddPartitionsToTxnRequest extends AbstractRequest { - private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; - private static final String PRODUCER_ID_KEY_NAME = "producer_id"; - private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch"; private static final String TOPICS_KEY_NAME = "topics"; private static final String PARTITIONS_KEY_NAME = "partitions"; private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema( - new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id corresponding to the transaction."), - new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."), - new Field(PRODUCER_EPOCH_KEY_NAME, INT16, "Current epoch associated with the producer id."), + TRANSACTIONAL_ID, + PRODUCER_ID, + PRODUCER_EPOCH, new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( TOPIC_NAME, new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32)))), @@ -109,9 +106,9 @@ public class AddPartitionsToTxnRequest extends AbstractRequest { public AddPartitionsToTxnRequest(Struct struct, short version) { super(version); - this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME); - this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); - this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME); + this.transactionalId = struct.get(TRANSACTIONAL_ID); + this.producerId = struct.get(PRODUCER_ID); + this.producerEpoch = struct.get(PRODUCER_EPOCH); List<TopicPartition> partitions = new ArrayList<>(); Object[] topicPartitionsArray = struct.getArray(TOPICS_KEY_NAME); @@ -144,9 +141,9 @@ public class AddPartitionsToTxnRequest extends AbstractRequest { @Override protected Struct toStruct() { Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.requestSchema(version())); - struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId); - struct.set(PRODUCER_ID_KEY_NAME, producerId); - struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch); + struct.set(TRANSACTIONAL_ID, transactionalId); + struct.set(PRODUCER_ID, producerId); + struct.set(PRODUCER_EPOCH, producerEpoch); Map<String, List<Integer>> mappedPartitions = CollectionUtils.groupDataByTopic(partitions); Object[] partitionsArray = new Object[mappedPartitions.size()]; http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java index 61c5a36..8dd6a17 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID; +import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID; import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; import static org.apache.kafka.common.protocol.types.Type.BYTES; import static org.apache.kafka.common.protocol.types.Type.STRING; @@ -39,20 +41,18 @@ public class DescribeGroupsResponse extends AbstractResponse { private static final String GROUPS_KEY_NAME = "groups"; - private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String GROUP_STATE_KEY_NAME = "state"; private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type"; private static final String PROTOCOL_KEY_NAME = "protocol"; private static final String MEMBERS_KEY_NAME = "members"; - private static final String MEMBER_ID_KEY_NAME = "member_id"; private static final String CLIENT_ID_KEY_NAME = "client_id"; private static final String CLIENT_HOST_KEY_NAME = "client_host"; private static final String MEMBER_METADATA_KEY_NAME = "member_metadata"; private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment"; private static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema( - new Field(MEMBER_ID_KEY_NAME, STRING, "The memberId assigned by the coordinator"), + MEMBER_ID, new Field(CLIENT_ID_KEY_NAME, STRING, "The client id used in the member's latest join group request"), new Field(CLIENT_HOST_KEY_NAME, STRING, "The client host used in the request session corresponding to the " + "member's join group."), @@ -63,7 +63,7 @@ public class DescribeGroupsResponse extends AbstractResponse { private static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = new Schema( ERROR_CODE, - new Field(GROUP_ID_KEY_NAME, STRING), + GROUP_ID, new Field(GROUP_STATE_KEY_NAME, STRING, "The current state of the group (one of: Dead, Stable, AwaitingSync, " + "PreparingRebalance, or empty if there is no active group)"), new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "The current group protocol type (will be empty if there is no active group)"), @@ -112,7 +112,7 @@ public class DescribeGroupsResponse extends AbstractResponse { for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) { Struct groupStruct = (Struct) groupObj; - String groupId = groupStruct.getString(GROUP_ID_KEY_NAME); + String groupId = groupStruct.get(GROUP_ID); Errors error = Errors.forCode(groupStruct.get(ERROR_CODE)); String state = groupStruct.getString(GROUP_STATE_KEY_NAME); String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME); @@ -121,7 +121,7 @@ public class DescribeGroupsResponse extends AbstractResponse { List<GroupMember> members = new ArrayList<>(); for (Object memberObj : groupStruct.getArray(MEMBERS_KEY_NAME)) { Struct memberStruct = (Struct) memberObj; - String memberId = memberStruct.getString(MEMBER_ID_KEY_NAME); + String memberId = memberStruct.get(MEMBER_ID); String clientId = memberStruct.getString(CLIENT_ID_KEY_NAME); String clientHost = memberStruct.getString(CLIENT_HOST_KEY_NAME); ByteBuffer memberMetadata = memberStruct.getBytes(MEMBER_METADATA_KEY_NAME); @@ -259,7 +259,7 @@ public class DescribeGroupsResponse extends AbstractResponse { for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) { Struct groupStruct = struct.instance(GROUPS_KEY_NAME); GroupMetadata group = groupEntry.getValue(); - groupStruct.set(GROUP_ID_KEY_NAME, groupEntry.getKey()); + groupStruct.set(GROUP_ID, groupEntry.getKey()); groupStruct.set(ERROR_CODE, group.error.code()); groupStruct.set(GROUP_STATE_KEY_NAME, group.state); groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType); @@ -267,7 +267,7 @@ public class DescribeGroupsResponse extends AbstractResponse { List<Struct> membersList = new ArrayList<>(); for (GroupMember member : group.members) { Struct memberStruct = groupStruct.instance(MEMBERS_KEY_NAME); - memberStruct.set(MEMBER_ID_KEY_NAME, member.memberId); + memberStruct.set(MEMBER_ID, member.memberId); memberStruct.set(CLIENT_ID_KEY_NAME, member.clientId); memberStruct.set(CLIENT_HOST_KEY_NAME, member.clientHost); memberStruct.set(MEMBER_METADATA_KEY_NAME, member.memberMetadata); http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java index 243e9f5..9118d6a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java @@ -24,21 +24,18 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; +import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH; +import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID; +import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID; import static org.apache.kafka.common.protocol.types.Type.BOOLEAN; -import static org.apache.kafka.common.protocol.types.Type.INT16; -import static org.apache.kafka.common.protocol.types.Type.INT64; -import static org.apache.kafka.common.protocol.types.Type.STRING; public class EndTxnRequest extends AbstractRequest { - private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; - private static final String PRODUCER_ID_KEY_NAME = "producer_id"; - private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch"; private static final String TRANSACTION_RESULT_KEY_NAME = "transaction_result"; private static final Schema END_TXN_REQUEST_V0 = new Schema( - new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id corresponding to the transaction."), - new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."), - new Field(PRODUCER_EPOCH_KEY_NAME, INT16, "Current epoch associated with the producer id."), + TRANSACTIONAL_ID, + PRODUCER_ID, + PRODUCER_EPOCH, new Field(TRANSACTION_RESULT_KEY_NAME, BOOLEAN, "The result of the transaction (0 = ABORT, 1 = COMMIT)")); public static Schema[] schemaVersions() { @@ -96,9 +93,9 @@ public class EndTxnRequest extends AbstractRequest { public EndTxnRequest(Struct struct, short version) { super(version); - this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME); - this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); - this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME); + this.transactionalId = struct.get(TRANSACTIONAL_ID); + this.producerId = struct.get(PRODUCER_ID); + this.producerEpoch = struct.get(PRODUCER_EPOCH); this.result = TransactionResult.forId(struct.getBoolean(TRANSACTION_RESULT_KEY_NAME)); } @@ -121,9 +118,9 @@ public class EndTxnRequest extends AbstractRequest { @Override protected Struct toStruct() { Struct struct = new Struct(ApiKeys.END_TXN.requestSchema(version())); - struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId); - struct.set(PRODUCER_ID_KEY_NAME, producerId); - struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch); + struct.set(TRANSACTIONAL_ID, transactionalId); + struct.set(PRODUCER_ID, producerId); + struct.set(PRODUCER_EPOCH, producerEpoch); struct.set(TRANSACTION_RESULT_KEY_NAME, result.id); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java index 00a806f..7d84918 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java @@ -18,24 +18,20 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -import static org.apache.kafka.common.protocol.types.Type.INT32; -import static org.apache.kafka.common.protocol.types.Type.STRING; +import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID; +import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID; +import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID; public class HeartbeatRequest extends AbstractRequest { - private static final String GROUP_ID_KEY_NAME = "group_id"; - private static final String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id"; - private static final String MEMBER_ID_KEY_NAME = "member_id"; - private static final Schema HEARTBEAT_REQUEST_V0 = new Schema( - new Field(GROUP_ID_KEY_NAME, STRING, "The group id."), - new Field(GROUP_GENERATION_ID_KEY_NAME, INT32, "The generation of the group."), - new Field(MEMBER_ID_KEY_NAME, STRING, "The member id assigned by the group coordinator.")); + GROUP_ID, + GENERATION_ID, + MEMBER_ID); /* v1 request is the same as v0. Throttle time has been added to response */ private static final Schema HEARTBEAT_REQUEST_V1 = HEARTBEAT_REQUEST_V0; @@ -86,9 +82,9 @@ public class HeartbeatRequest extends AbstractRequest { public HeartbeatRequest(Struct struct, short version) { super(version); - groupId = struct.getString(GROUP_ID_KEY_NAME); - groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME); - memberId = struct.getString(MEMBER_ID_KEY_NAME); + groupId = struct.get(GROUP_ID); + groupGenerationId = struct.get(GENERATION_ID); + memberId = struct.get(MEMBER_ID); } @Override @@ -124,9 +120,9 @@ public class HeartbeatRequest extends AbstractRequest { @Override protected Struct toStruct() { Struct struct = new Struct(ApiKeys.HEARTBEAT.requestSchema(version())); - struct.set(GROUP_ID_KEY_NAME, groupId); - struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId); - struct.set(MEMBER_ID_KEY_NAME, memberId); + struct.set(GROUP_ID, groupId); + struct.set(GENERATION_ID, groupGenerationId); + struct.set(MEMBER_ID, memberId); return struct; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java index 9ecb21f..7a988ca 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java @@ -18,7 +18,6 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.RecordBatch; @@ -27,9 +26,9 @@ import java.nio.ByteBuffer; import java.util.Map; import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH; +import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID; import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.types.Type.INT16; -import static org.apache.kafka.common.protocol.types.Type.INT64; public class InitProducerIdResponse extends AbstractResponse { // Possible error codes: @@ -39,16 +38,11 @@ public class InitProducerIdResponse extends AbstractResponse { // TransactionalIdAuthorizationFailed // ClusterAuthorizationFailed - private static final String PRODUCER_ID_KEY_NAME = "producer_id"; - private static final String EPOCH_KEY_NAME = "producer_epoch"; - private static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema( THROTTLE_TIME_MS, ERROR_CODE, - new Field(PRODUCER_ID_KEY_NAME, INT64, "The producer id for the input transactional id. If the input " + - "id was empty, then this is used only for ensuring idempotence of messages."), - new Field(EPOCH_KEY_NAME, INT16, "The epoch for the producer id. Will always be 0 if no transactional " + - "id was specified in the request.")); + PRODUCER_ID, + PRODUCER_EPOCH); public static Schema[] schemaVersions() { return new Schema[]{INIT_PRODUCER_ID_RESPONSE_V0}; @@ -69,8 +63,8 @@ public class InitProducerIdResponse extends AbstractResponse { public InitProducerIdResponse(Struct struct) { this.throttleTimeMs = struct.get(THROTTLE_TIME_MS); this.error = Errors.forCode(struct.get(ERROR_CODE)); - this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); - this.epoch = struct.getShort(EPOCH_KEY_NAME); + this.producerId = struct.get(PRODUCER_ID); + this.epoch = struct.get(PRODUCER_EPOCH); } public InitProducerIdResponse(int throttleTimeMs, Errors errors) { @@ -102,8 +96,8 @@ public class InitProducerIdResponse extends AbstractResponse { protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version)); struct.set(THROTTLE_TIME_MS, throttleTimeMs); - struct.set(PRODUCER_ID_KEY_NAME, producerId); - struct.set(EPOCH_KEY_NAME, epoch); + struct.set(PRODUCER_ID, producerId); + struct.set(PRODUCER_EPOCH, epoch); struct.set(ERROR_CODE, error.code()); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index b2ff133..a7b62a9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -29,15 +29,15 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID; +import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID; import static org.apache.kafka.common.protocol.types.Type.BYTES; import static org.apache.kafka.common.protocol.types.Type.INT32; import static org.apache.kafka.common.protocol.types.Type.STRING; public class JoinGroupRequest extends AbstractRequest { - private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout"; private static final String REBALANCE_TIMEOUT_KEY_NAME = "rebalance_timeout"; - private static final String MEMBER_ID_KEY_NAME = "member_id"; private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type"; private static final String GROUP_PROTOCOLS_KEY_NAME = "group_protocols"; private static final String PROTOCOL_NAME_KEY_NAME = "protocol_name"; @@ -49,21 +49,21 @@ public class JoinGroupRequest extends AbstractRequest { new Field(PROTOCOL_METADATA_KEY_NAME, BYTES)); private static final Schema JOIN_GROUP_REQUEST_V0 = new Schema( - new Field(GROUP_ID_KEY_NAME, STRING, "The group id."), + GROUP_ID, new Field(SESSION_TIMEOUT_KEY_NAME, INT32, "The coordinator considers the consumer dead if it receives " + "no heartbeat after this timeout in ms."), - new Field(MEMBER_ID_KEY_NAME, STRING, "The assigned consumer id or an empty string for a new consumer."), + MEMBER_ID, new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class of protocols implemented by group"), new Field(GROUP_PROTOCOLS_KEY_NAME, new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " + "that the member supports")); private static final Schema JOIN_GROUP_REQUEST_V1 = new Schema( - new Field(GROUP_ID_KEY_NAME, STRING, "The group id."), + GROUP_ID, new Field(SESSION_TIMEOUT_KEY_NAME, INT32, "The coordinator considers the consumer dead if it receives no " + "heartbeat after this timeout in ms."), new Field(REBALANCE_TIMEOUT_KEY_NAME, INT32, "The maximum time that the coordinator will wait for each " + "member to rejoin when rebalancing the group"), - new Field(MEMBER_ID_KEY_NAME, STRING, "The assigned consumer id or an empty string for a new consumer."), + MEMBER_ID, new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "Unique name for class of protocols implemented by group"), new Field(GROUP_PROTOCOLS_KEY_NAME, new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols " + "that the member supports")); @@ -166,7 +166,7 @@ public class JoinGroupRequest extends AbstractRequest { public JoinGroupRequest(Struct struct, short versionId) { super(versionId); - groupId = struct.getString(GROUP_ID_KEY_NAME); + groupId = struct.get(GROUP_ID); sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME); if (struct.hasField(REBALANCE_TIMEOUT_KEY_NAME)) @@ -176,7 +176,7 @@ public class JoinGroupRequest extends AbstractRequest { // v0 had no rebalance timeout but used session timeout implicitly rebalanceTimeout = sessionTimeout; - memberId = struct.getString(MEMBER_ID_KEY_NAME); + memberId = struct.get(MEMBER_ID); protocolType = struct.getString(PROTOCOL_TYPE_KEY_NAME); groupProtocols = new ArrayList<>(); @@ -249,12 +249,12 @@ public class JoinGroupRequest extends AbstractRequest { protected Struct toStruct() { short version = version(); Struct struct = new Struct(ApiKeys.JOIN_GROUP.requestSchema(version)); - struct.set(GROUP_ID_KEY_NAME, groupId); + struct.set(GROUP_ID, groupId); struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout); if (version >= 1) { struct.set(REBALANCE_TIMEOUT_KEY_NAME, rebalanceTimeout); } - struct.set(MEMBER_ID_KEY_NAME, memberId); + struct.set(MEMBER_ID, memberId); struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType); List<Struct> groupProtocolsList = new ArrayList<>(groupProtocols.size()); for (ProtocolMetadata protocol : groupProtocols) { http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index 56491eb..7f5ad3f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -30,31 +30,30 @@ import java.util.List; import java.util.Map; import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID; +import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID; import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; import static org.apache.kafka.common.protocol.types.Type.BYTES; -import static org.apache.kafka.common.protocol.types.Type.INT32; import static org.apache.kafka.common.protocol.types.Type.STRING; public class JoinGroupResponse extends AbstractResponse { - private static final String GENERATION_ID_KEY_NAME = "generation_id"; private static final String GROUP_PROTOCOL_KEY_NAME = "group_protocol"; private static final String LEADER_ID_KEY_NAME = "leader_id"; - private static final String MEMBER_ID_KEY_NAME = "member_id"; private static final String MEMBERS_KEY_NAME = "members"; private static final String MEMBER_METADATA_KEY_NAME = "member_metadata"; private static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema( - new Field(MEMBER_ID_KEY_NAME, STRING), + MEMBER_ID, new Field(MEMBER_METADATA_KEY_NAME, BYTES)); private static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema( ERROR_CODE, - new Field(GENERATION_ID_KEY_NAME, INT32, "The generation of the consumer group."), + GENERATION_ID, new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol selected by the coordinator"), new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"), - new Field(MEMBER_ID_KEY_NAME, STRING, "The consumer id assigned by the group coordinator."), + MEMBER_ID, new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0))); private static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0; @@ -62,10 +61,10 @@ public class JoinGroupResponse extends AbstractResponse { private static final Schema JOIN_GROUP_RESPONSE_V2 = new Schema( THROTTLE_TIME_MS, ERROR_CODE, - new Field(GENERATION_ID_KEY_NAME, INT32, "The generation of the consumer group."), + GENERATION_ID, new Field(GROUP_PROTOCOL_KEY_NAME, STRING, "The group protocol selected by the coordinator"), new Field(LEADER_ID_KEY_NAME, STRING, "The leader of the group"), - new Field(MEMBER_ID_KEY_NAME, STRING, "The consumer id assigned by the group coordinator."), + MEMBER_ID, new Field(MEMBERS_KEY_NAME, new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0))); @@ -128,14 +127,14 @@ public class JoinGroupResponse extends AbstractResponse { for (Object memberDataObj : struct.getArray(MEMBERS_KEY_NAME)) { Struct memberData = (Struct) memberDataObj; - String memberId = memberData.getString(MEMBER_ID_KEY_NAME); + String memberId = memberData.get(MEMBER_ID); ByteBuffer memberMetadata = memberData.getBytes(MEMBER_METADATA_KEY_NAME); members.put(memberId, memberMetadata); } error = Errors.forCode(struct.get(ERROR_CODE)); - generationId = struct.getInt(GENERATION_ID_KEY_NAME); + generationId = struct.get(GENERATION_ID); groupProtocol = struct.getString(GROUP_PROTOCOL_KEY_NAME); - memberId = struct.getString(MEMBER_ID_KEY_NAME); + memberId = struct.get(MEMBER_ID); leaderId = struct.getString(LEADER_ID_KEY_NAME); } @@ -186,15 +185,15 @@ public class JoinGroupResponse extends AbstractResponse { struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); struct.set(ERROR_CODE, error.code()); - struct.set(GENERATION_ID_KEY_NAME, generationId); + struct.set(GENERATION_ID, generationId); struct.set(GROUP_PROTOCOL_KEY_NAME, groupProtocol); - struct.set(MEMBER_ID_KEY_NAME, memberId); + struct.set(MEMBER_ID, memberId); struct.set(LEADER_ID_KEY_NAME, leaderId); List<Struct> memberArray = new ArrayList<>(); for (Map.Entry<String, ByteBuffer> entries : members.entrySet()) { Struct memberData = struct.instance(MEMBERS_KEY_NAME); - memberData.set(MEMBER_ID_KEY_NAME, entries.getKey()); + memberData.set(MEMBER_ID, entries.getKey()); memberData.set(MEMBER_METADATA_KEY_NAME, entries.getValue()); memberArray.add(memberData); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java index 661eb7f..b0d0ad6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java @@ -18,21 +18,18 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -import static org.apache.kafka.common.protocol.types.Type.STRING; +import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID; +import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID; public class LeaveGroupRequest extends AbstractRequest { - private static final String GROUP_ID_KEY_NAME = "group_id"; - private static final String MEMBER_ID_KEY_NAME = "member_id"; - private static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema( - new Field(GROUP_ID_KEY_NAME, STRING, "The group id."), - new Field(MEMBER_ID_KEY_NAME, STRING, "The member id assigned by the group coordinator.")); + GROUP_ID, + MEMBER_ID); /* v1 request is the same as v0. Throttle time has been added to response */ private static final Schema LEAVE_GROUP_REQUEST_V1 = LEAVE_GROUP_REQUEST_V0; @@ -78,8 +75,8 @@ public class LeaveGroupRequest extends AbstractRequest { public LeaveGroupRequest(Struct struct, short version) { super(version); - groupId = struct.getString(GROUP_ID_KEY_NAME); - memberId = struct.getString(MEMBER_ID_KEY_NAME); + groupId = struct.get(GROUP_ID); + memberId = struct.get(MEMBER_ID); } @Override @@ -111,8 +108,8 @@ public class LeaveGroupRequest extends AbstractRequest { @Override protected Struct toStruct() { Struct struct = new Struct(ApiKeys.LEAVE_GROUP.requestSchema(version())); - struct.set(GROUP_ID_KEY_NAME, groupId); - struct.set(MEMBER_ID_KEY_NAME, memberId); + struct.set(GROUP_ID, groupId); + struct.set(MEMBER_ID, memberId); return struct; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java index afc5ebd..9c82ae0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java @@ -29,17 +29,17 @@ import java.util.List; import java.util.Map; import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; +import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID; import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; import static org.apache.kafka.common.protocol.types.Type.STRING; public class ListGroupsResponse extends AbstractResponse { private static final String GROUPS_KEY_NAME = "groups"; - private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type"; private static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema( - new Field(GROUP_ID_KEY_NAME, STRING), + GROUP_ID, new Field(PROTOCOL_TYPE_KEY_NAME, STRING)); private static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema( ERROR_CODE, @@ -80,7 +80,7 @@ public class ListGroupsResponse extends AbstractResponse { this.groups = new ArrayList<>(); for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) { Struct groupStruct = (Struct) groupObj; - String groupId = groupStruct.getString(GROUP_ID_KEY_NAME); + String groupId = groupStruct.get(GROUP_ID); String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME); this.groups.add(new Group(groupId, protocolType)); } @@ -130,7 +130,7 @@ public class ListGroupsResponse extends AbstractResponse { List<Struct> groupList = new ArrayList<>(); for (Group group : groups) { Struct groupStruct = struct.instance(GROUPS_KEY_NAME); - groupStruct.set(GROUP_ID_KEY_NAME, group.groupId); + groupStruct.set(GROUP_ID, group.groupId); groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType); groupList.add(groupStruct); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 696d967..4686c3b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -32,20 +32,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID; +import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID; +import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID; import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.INT32; import static org.apache.kafka.common.protocol.types.Type.INT64; import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING; -import static org.apache.kafka.common.protocol.types.Type.STRING; /** * This wrapper supports both v0 and v1 of OffsetCommitRequest. */ public class OffsetCommitRequest extends AbstractRequest { - private static final String GROUP_ID_KEY_NAME = "group_id"; - private static final String GENERATION_ID_KEY_NAME = "group_generation_id"; - private static final String MEMBER_ID_KEY_NAME = "member_id"; private static final String TOPICS_KEY_NAME = "topics"; private static final String RETENTION_TIME_KEY_NAME = "retention_time"; @@ -89,19 +87,19 @@ public class OffsetCommitRequest extends AbstractRequest { new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V2), "Partitions to commit offsets.")); private static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema( - new Field(GROUP_ID_KEY_NAME, STRING, "The group id."), + GROUP_ID, new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0), "Topics to commit offsets.")); private static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema( - new Field(GROUP_ID_KEY_NAME, STRING, "The group id."), - new Field(GENERATION_ID_KEY_NAME, INT32, "The generation of the group."), - new Field(MEMBER_ID_KEY_NAME, STRING, "The member id assigned by the group coordinator."), + GROUP_ID, + GENERATION_ID, + MEMBER_ID, new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1), "Topics to commit offsets.")); private static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema( - new Field(GROUP_ID_KEY_NAME, STRING, "The group id."), - new Field(GENERATION_ID_KEY_NAME, INT32, "The generation of the consumer group."), - new Field(MEMBER_ID_KEY_NAME, STRING, "The consumer id assigned by the group coordinator."), + GROUP_ID, + GENERATION_ID, + MEMBER_ID, new Field(RETENTION_TIME_KEY_NAME, INT64, "Time period in ms to retain the offset."), new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2), "Topics to commit offsets.")); @@ -229,18 +227,11 @@ public class OffsetCommitRequest extends AbstractRequest { public OffsetCommitRequest(Struct struct, short versionId) { super(versionId); - groupId = struct.getString(GROUP_ID_KEY_NAME); - // This field only exists in v1. - if (struct.hasField(GENERATION_ID_KEY_NAME)) - generationId = struct.getInt(GENERATION_ID_KEY_NAME); - else - generationId = DEFAULT_GENERATION_ID; + groupId = struct.get(GROUP_ID); - // This field only exists in v1. - if (struct.hasField(MEMBER_ID_KEY_NAME)) - memberId = struct.getString(MEMBER_ID_KEY_NAME); - else - memberId = DEFAULT_MEMBER_ID; + // These fields only exists in v1. + generationId = struct.getOrElse(GENERATION_ID, DEFAULT_GENERATION_ID); + memberId = struct.getOrElse(MEMBER_ID, DEFAULT_MEMBER_ID); // This field only exists in v2 if (struct.hasField(RETENTION_TIME_KEY_NAME)) @@ -274,7 +265,7 @@ public class OffsetCommitRequest extends AbstractRequest { public Struct toStruct() { short version = version(); Struct struct = new Struct(ApiKeys.OFFSET_COMMIT.requestSchema(version)); - struct.set(GROUP_ID_KEY_NAME, groupId); + struct.set(GROUP_ID, groupId); Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData); List<Struct> topicArray = new ArrayList<>(); @@ -297,10 +288,8 @@ public class OffsetCommitRequest extends AbstractRequest { topicArray.add(topicData); } struct.set(TOPICS_KEY_NAME, topicArray.toArray()); - if (struct.hasField(GENERATION_ID_KEY_NAME)) - struct.set(GENERATION_ID_KEY_NAME, generationId); - if (struct.hasField(MEMBER_ID_KEY_NAME)) - struct.set(MEMBER_ID_KEY_NAME, memberId); + struct.setIfExists(GENERATION_ID, generationId); + struct.setIfExists(MEMBER_ID, memberId); if (struct.hasField(RETENTION_TIME_KEY_NAME)) struct.set(RETENTION_TIME_KEY_NAME, retentionTime); return struct; http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index 6d8b959..0db1c50 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -33,12 +33,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID; import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.STRING; public class OffsetFetchRequest extends AbstractRequest { - private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String TOPICS_KEY_NAME = "topics"; // topic level field names @@ -63,13 +62,13 @@ public class OffsetFetchRequest extends AbstractRequest { new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0), "Partitions to fetch offsets.")); private static final Schema OFFSET_FETCH_REQUEST_V0 = new Schema( - new Field(GROUP_ID_KEY_NAME, STRING, "The consumer group id."), + GROUP_ID, new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0), "Topics to fetch offsets.")); private static final Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0; private static final Schema OFFSET_FETCH_REQUEST_V2 = new Schema( - new Field(GROUP_ID_KEY_NAME, STRING, "The consumer group id."), + GROUP_ID, new Field(TOPICS_KEY_NAME, ArrayOf.nullable(OFFSET_FETCH_REQUEST_TOPIC_V0), "Topics to fetch offsets. If the " + "topic array is null fetch offsets for all topics.")); @@ -153,7 +152,7 @@ public class OffsetFetchRequest extends AbstractRequest { partitions = null; - groupId = struct.getString(GROUP_ID_KEY_NAME); + groupId = struct.get(GROUP_ID); } public OffsetFetchResponse getErrorResponse(Errors error) { @@ -210,7 +209,7 @@ public class OffsetFetchRequest extends AbstractRequest { @Override protected Struct toStruct() { Struct struct = new Struct(ApiKeys.OFFSET_FETCH.requestSchema(version())); - struct.set(GROUP_ID_KEY_NAME, groupId); + struct.set(GROUP_ID, groupId); if (partitions != null) { Map<String, List<Integer>> topicsData = CollectionUtils.groupDataByTopic(partitions); http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java index 4ff9fcd..14ed262 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java @@ -30,24 +30,22 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.GENERATION_ID; +import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID; +import static org.apache.kafka.common.protocol.CommonFields.MEMBER_ID; import static org.apache.kafka.common.protocol.types.Type.BYTES; -import static org.apache.kafka.common.protocol.types.Type.INT32; -import static org.apache.kafka.common.protocol.types.Type.STRING; public class SyncGroupRequest extends AbstractRequest { - private static final String GROUP_ID_KEY_NAME = "group_id"; - private static final String GENERATION_ID_KEY_NAME = "generation_id"; - private static final String MEMBER_ID_KEY_NAME = "member_id"; private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment"; private static final String GROUP_ASSIGNMENT_KEY_NAME = "group_assignment"; private static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema( - new Field(MEMBER_ID_KEY_NAME, STRING), + MEMBER_ID, new Field(MEMBER_ASSIGNMENT_KEY_NAME, BYTES)); private static final Schema SYNC_GROUP_REQUEST_V0 = new Schema( - new Field(GROUP_ID_KEY_NAME, STRING), - new Field(GENERATION_ID_KEY_NAME, INT32), - new Field(MEMBER_ID_KEY_NAME, STRING), + GROUP_ID, + GENERATION_ID, + MEMBER_ID, new Field(GROUP_ASSIGNMENT_KEY_NAME, new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0))); /* v1 request is the same as v0. Throttle time has been added to response */ @@ -106,15 +104,15 @@ public class SyncGroupRequest extends AbstractRequest { public SyncGroupRequest(Struct struct, short version) { super(version); - this.groupId = struct.getString(GROUP_ID_KEY_NAME); - this.generationId = struct.getInt(GENERATION_ID_KEY_NAME); - this.memberId = struct.getString(MEMBER_ID_KEY_NAME); + this.groupId = struct.get(GROUP_ID); + this.generationId = struct.get(GENERATION_ID); + this.memberId = struct.get(MEMBER_ID); groupAssignment = new HashMap<>(); for (Object memberDataObj : struct.getArray(GROUP_ASSIGNMENT_KEY_NAME)) { Struct memberData = (Struct) memberDataObj; - String memberId = memberData.getString(MEMBER_ID_KEY_NAME); + String memberId = memberData.get(MEMBER_ID); ByteBuffer memberMetadata = memberData.getBytes(MEMBER_ASSIGNMENT_KEY_NAME); groupAssignment.put(memberId, memberMetadata); } @@ -162,14 +160,14 @@ public class SyncGroupRequest extends AbstractRequest { @Override protected Struct toStruct() { Struct struct = new Struct(ApiKeys.SYNC_GROUP.requestSchema(version())); - struct.set(GROUP_ID_KEY_NAME, groupId); - struct.set(GENERATION_ID_KEY_NAME, generationId); - struct.set(MEMBER_ID_KEY_NAME, memberId); + struct.set(GROUP_ID, groupId); + struct.set(GENERATION_ID, generationId); + struct.set(MEMBER_ID, memberId); List<Struct> memberArray = new ArrayList<>(); for (Map.Entry<String, ByteBuffer> entries: groupAssignment.entrySet()) { Struct memberData = struct.instance(GROUP_ASSIGNMENT_KEY_NAME); - memberData.set(MEMBER_ID_KEY_NAME, entries.getKey()); + memberData.set(MEMBER_ID, entries.getKey()); memberData.set(MEMBER_ASSIGNMENT_KEY_NAME, entries.getValue()); memberArray.add(memberData); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e409f847/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java index d384192..9787c2d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java @@ -29,18 +29,16 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID; import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; +import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH; +import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID; import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.INT16; +import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID; import static org.apache.kafka.common.protocol.types.Type.INT64; import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING; -import static org.apache.kafka.common.protocol.types.Type.STRING; public class TxnOffsetCommitRequest extends AbstractRequest { - private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id"; - private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id"; - private static final String PRODUCER_ID_KEY_NAME = "producer_id"; - private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch"; private static final String TOPICS_KEY_NAME = "topics"; private static final String PARTITIONS_KEY_NAME = "partitions"; private static final String OFFSET_KEY_NAME = "offset"; @@ -52,10 +50,10 @@ public class TxnOffsetCommitRequest extends AbstractRequest { new Field(METADATA_KEY_NAME, NULLABLE_STRING)); private static final Schema TXN_OFFSET_COMMIT_REQUEST_V0 = new Schema( - new Field(TRANSACTIONAL_ID_KEY_NAME, STRING, "The transactional id corresponding to the transaction."), - new Field(CONSUMER_GROUP_ID_KEY_NAME, STRING, "Id of the associated consumer group to commit offsets for."), - new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."), - new Field(PRODUCER_EPOCH_KEY_NAME, INT16, "Current epoch associated with the producer id."), + TRANSACTIONAL_ID, + GROUP_ID, + PRODUCER_ID, + PRODUCER_EPOCH, new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( TOPIC_NAME, new Field(PARTITIONS_KEY_NAME, new ArrayOf(TXN_OFFSET_COMMIT_PARTITION_OFFSET_METADATA_REQUEST_V0)))), @@ -127,10 +125,10 @@ public class TxnOffsetCommitRequest extends AbstractRequest { public TxnOffsetCommitRequest(Struct struct, short version) { super(version); - this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME); - this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME); - this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME); - this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME); + this.transactionalId = struct.get(TRANSACTIONAL_ID); + this.consumerGroupId = struct.get(GROUP_ID); + this.producerId = struct.get(PRODUCER_ID); + this.producerEpoch = struct.get(PRODUCER_EPOCH); Map<TopicPartition, CommittedOffset> offsets = new HashMap<>(); Object[] topicPartitionsArray = struct.getArray(TOPICS_KEY_NAME); @@ -171,10 +169,10 @@ public class TxnOffsetCommitRequest extends AbstractRequest { @Override protected Struct toStruct() { Struct struct = new Struct(ApiKeys.TXN_OFFSET_COMMIT.requestSchema(version())); - struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId); - struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId); - struct.set(PRODUCER_ID_KEY_NAME, producerId); - struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch); + struct.set(TRANSACTIONAL_ID, transactionalId); + struct.set(GROUP_ID, consumerGroupId); + struct.set(PRODUCER_ID, producerId); + struct.set(PRODUCER_EPOCH, producerEpoch); Map<String, Map<Integer, CommittedOffset>> mappedPartitionOffsets = CollectionUtils.groupDataByTopic(offsets); Object[] partitionsArray = new Object[mappedPartitionOffsets.size()];