http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index 64bd3d3..dee6a5f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; + import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; import org.apache.kafka.common.record.Records; @@ -39,7 +39,6 @@ import java.util.Map; */ public class FetchResponse extends AbstractResponse { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id); private static final String RESPONSES_KEY_NAME = "responses"; // topic level field names @@ -71,7 +70,7 @@ public class FetchResponse extends AbstractResponse { public static final long INVALID_HIGHWATERMARK = -1L; private final LinkedHashMap<TopicPartition, PartitionData> responseData; - private final int throttleTime; + private final int throttleTimeMs; public static final class PartitionData { public final Errors error; @@ -92,35 +91,20 @@ public class FetchResponse extends AbstractResponse { } /** - * Constructor for version 3. - * - * The entries in `responseData` should be in the same order as the entries in `FetchRequest.fetchData`. - * - * @param responseData fetched data grouped by topic-partition - * @param throttleTime Time in milliseconds the response was throttled - */ - public FetchResponse(LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTime) { - this(3, responseData, throttleTime); - } - - /** * Constructor for all versions. * * From version 3, the entries in `responseData` should be in the same order as the entries in * `FetchRequest.fetchData`. * * @param responseData fetched data grouped by topic-partition - * @param throttleTime Time in milliseconds the response was throttled + * @param throttleTimeMs Time in milliseconds the response was throttled */ - public FetchResponse(int version, LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTime) { - super(writeStruct(new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, version)), version, responseData, - throttleTime)); + public FetchResponse(LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTimeMs) { this.responseData = responseData; - this.throttleTime = throttleTime; + this.throttleTimeMs = throttleTimeMs; } public FetchResponse(Struct struct) { - super(struct); LinkedHashMap<TopicPartition, PartitionData> responseData = new LinkedHashMap<>(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; @@ -137,22 +121,31 @@ public class FetchResponse extends AbstractResponse { } } this.responseData = responseData; - this.throttleTime = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; + this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; + } + + @Override + public Struct toStruct(short version) { + return toStruct(version, responseData, throttleTimeMs); } @Override public Send toSend(String dest, RequestHeader requestHeader) { - ResponseHeader responseHeader = new ResponseHeader(requestHeader.correlationId()); + return toSend(toStruct(requestHeader.apiVersion()), throttleTimeMs, dest, requestHeader); + } + + public Send toSend(Struct responseStruct, int throttleTimeMs, String dest, RequestHeader requestHeader) { + Struct responseHeader = new ResponseHeader(requestHeader.correlationId()).toStruct(); // write the total size and the response header ByteBuffer buffer = ByteBuffer.allocate(responseHeader.sizeOf() + 4); - buffer.putInt(responseHeader.sizeOf() + struct.sizeOf()); + buffer.putInt(responseHeader.sizeOf() + responseStruct.sizeOf()); responseHeader.writeTo(buffer); buffer.rewind(); List<Send> sends = new ArrayList<>(); sends.add(new ByteBufferSend(dest, buffer)); - addResponseData(dest, sends); + addResponseData(responseStruct, throttleTimeMs, dest, sends); return new MultiSend(dest, sends); } @@ -160,25 +153,20 @@ public class FetchResponse extends AbstractResponse { return responseData; } - public int getThrottleTime() { - return this.throttleTime; + public int throttleTimeMs() { + return this.throttleTimeMs; } - public static FetchResponse parse(ByteBuffer buffer) { - return new FetchResponse(CURRENT_SCHEMA.read(buffer)); - } - - public static FetchResponse parse(ByteBuffer buffer, int version) { + public static FetchResponse parse(ByteBuffer buffer, short version) { return new FetchResponse(ProtoUtils.responseSchema(ApiKeys.FETCH.id, version).read(buffer)); } - private void addResponseData(String dest, List<Send> sends) { + private static void addResponseData(Struct struct, int throttleTimeMs, String dest, List<Send> sends) { Object[] allTopicData = struct.getArray(RESPONSES_KEY_NAME); if (struct.hasField(THROTTLE_TIME_KEY_NAME)) { - int throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME); ByteBuffer buffer = ByteBuffer.allocate(8); - buffer.putInt(throttleTime); + buffer.putInt(throttleTimeMs); buffer.putInt(allTopicData.length); buffer.rewind(); sends.add(new ByteBufferSend(dest, buffer)); @@ -193,7 +181,7 @@ public class FetchResponse extends AbstractResponse { addTopicData(dest, sends, (Struct) topicData); } - private void addTopicData(String dest, List<Send> sends, Struct topicData) { + private static void addTopicData(String dest, List<Send> sends, Struct topicData) { String topic = topicData.getString(TOPIC_KEY_NAME); Object[] allPartitionData = topicData.getArray(PARTITIONS_KEY_NAME); @@ -208,7 +196,7 @@ public class FetchResponse extends AbstractResponse { addPartitionData(dest, sends, (Struct) partitionData); } - private void addPartitionData(String dest, List<Send> sends, Struct partitionData) { + private static void addPartitionData(String dest, List<Send> sends, Struct partitionData) { Struct header = partitionData.getStruct(PARTITION_HEADER_KEY_NAME); Records records = partitionData.getRecords(RECORD_SET_KEY_NAME); @@ -223,10 +211,8 @@ public class FetchResponse extends AbstractResponse { sends.add(new RecordsSend(dest, records)); } - private static Struct writeStruct(Struct struct, - int version, - LinkedHashMap<TopicPartition, PartitionData> responseData, - int throttleTime) { + private static Struct toStruct(short version, LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTime) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, version)); List<FetchRequest.TopicAndPartitionData<PartitionData>> topicsData = FetchRequest.TopicAndPartitionData.batchByTopic(responseData); List<Struct> topicArray = new ArrayList<>(); for (FetchRequest.TopicAndPartitionData<PartitionData> topicEntry: topicsData) { @@ -255,10 +241,8 @@ public class FetchResponse extends AbstractResponse { return struct; } - public static int sizeOf(int version, LinkedHashMap<TopicPartition, PartitionData> responseData) { - Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, version)); - writeStruct(struct, version, responseData, 0); - return 4 + struct.sizeOf(); + public static int sizeOf(short version, LinkedHashMap<TopicPartition, PartitionData> responseData) { + return 4 + toStruct(version, responseData, 0).sizeOf(); } }
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java index ed56f39..83d6cba 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java @@ -32,8 +32,7 @@ public class GroupCoordinatorRequest extends AbstractRequest { } @Override - public GroupCoordinatorRequest build() { - short version = version(); + public GroupCoordinatorRequest build(short version) { return new GroupCoordinatorRequest(this.groupId, version); } @@ -49,14 +48,12 @@ public class GroupCoordinatorRequest extends AbstractRequest { private final String groupId; private GroupCoordinatorRequest(String groupId, short version) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.GROUP_COORDINATOR.id, version)), - version); - struct.set(GROUP_ID_KEY_NAME, groupId); + super(version); this.groupId = groupId; } public GroupCoordinatorRequest(Struct struct, short versionId) { - super(struct, versionId); + super(versionId); groupId = struct.getString(GROUP_ID_KEY_NAME); } @@ -76,12 +73,15 @@ public class GroupCoordinatorRequest extends AbstractRequest { return groupId; } - public static GroupCoordinatorRequest parse(ByteBuffer buffer, int versionId) { + public static GroupCoordinatorRequest parse(ByteBuffer buffer, short versionId) { return new GroupCoordinatorRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_COORDINATOR.id, versionId, buffer), - (short) versionId); + versionId); } - public static GroupCoordinatorRequest parse(ByteBuffer buffer) { - return parse(buffer, ProtoUtils.latestVersion(ApiKeys.GROUP_COORDINATOR.id)); + @Override + protected Struct toStruct() { + Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.GROUP_COORDINATOR.id, version())); + struct.set(GROUP_ID_KEY_NAME, groupId); + return struct; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java index fc3d358..c13cf3c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java @@ -16,14 +16,12 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; public class GroupCoordinatorResponse extends AbstractResponse { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_COORDINATOR.id); private static final String ERROR_CODE_KEY_NAME = "error_code"; private static final String COORDINATOR_KEY_NAME = "coordinator"; @@ -45,19 +43,11 @@ public class GroupCoordinatorResponse extends AbstractResponse { private final Node node; public GroupCoordinatorResponse(Errors error, Node node) { - super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, error.code()); - Struct coordinator = struct.instance(COORDINATOR_KEY_NAME); - coordinator.set(NODE_ID_KEY_NAME, node.id()); - coordinator.set(HOST_KEY_NAME, node.host()); - coordinator.set(PORT_KEY_NAME, node.port()); - struct.set(COORDINATOR_KEY_NAME, coordinator); this.error = error; this.node = node; } public GroupCoordinatorResponse(Struct struct) { - super(struct); error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME); int nodeId = broker.getInt(NODE_ID_KEY_NAME); @@ -74,7 +64,19 @@ public class GroupCoordinatorResponse extends AbstractResponse { return node; } - public static GroupCoordinatorResponse parse(ByteBuffer buffer) { - return new GroupCoordinatorResponse(CURRENT_SCHEMA.read(buffer)); + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.GROUP_COORDINATOR.id, version)); + struct.set(ERROR_CODE_KEY_NAME, error.code()); + Struct coordinator = struct.instance(COORDINATOR_KEY_NAME); + coordinator.set(NODE_ID_KEY_NAME, node.id()); + coordinator.set(HOST_KEY_NAME, node.host()); + coordinator.set(PORT_KEY_NAME, node.port()); + struct.set(COORDINATOR_KEY_NAME, coordinator); + return struct; + } + + public static GroupCoordinatorResponse parse(ByteBuffer buffer, short version) { + return new GroupCoordinatorResponse(ProtoUtils.parseResponse(ApiKeys.GROUP_COORDINATOR.id, version, buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java index 7e79c8a..4440830 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java @@ -37,8 +37,8 @@ public class HeartbeatRequest extends AbstractRequest { } @Override - public HeartbeatRequest build() { - return new HeartbeatRequest(groupId, groupGenerationId, memberId, version()); + public HeartbeatRequest build(short version) { + return new HeartbeatRequest(groupId, groupGenerationId, memberId, version); } @Override @@ -58,18 +58,14 @@ public class HeartbeatRequest extends AbstractRequest { private final String memberId; private HeartbeatRequest(String groupId, int groupGenerationId, String memberId, short version) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.HEARTBEAT.id, version)), - version); - struct.set(GROUP_ID_KEY_NAME, groupId); - struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId); - struct.set(MEMBER_ID_KEY_NAME, memberId); + super(version); this.groupId = groupId; this.groupGenerationId = groupGenerationId; this.memberId = memberId; } - public HeartbeatRequest(Struct struct, short versionId) { - super(struct, versionId); + public HeartbeatRequest(Struct struct, short version) { + super(version); groupId = struct.getString(GROUP_ID_KEY_NAME); groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME); memberId = struct.getString(MEMBER_ID_KEY_NAME); @@ -99,11 +95,16 @@ public class HeartbeatRequest extends AbstractRequest { return memberId; } - public static HeartbeatRequest parse(ByteBuffer buffer, int versionId) { - return new HeartbeatRequest(ProtoUtils.parseRequest(ApiKeys.HEARTBEAT.id, versionId, buffer), (short) versionId); + public static HeartbeatRequest parse(ByteBuffer buffer, short versionId) { + return new HeartbeatRequest(ProtoUtils.parseRequest(ApiKeys.HEARTBEAT.id, versionId, buffer), versionId); } - public static HeartbeatRequest parse(ByteBuffer buffer) { - return parse(buffer, ProtoUtils.latestVersion(ApiKeys.HEARTBEAT.id)); + @Override + protected Struct toStruct() { + Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.HEARTBEAT.id, version())); + struct.set(GROUP_ID_KEY_NAME, groupId); + struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId); + struct.set(MEMBER_ID_KEY_NAME, memberId); + return struct; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java index f36dec4..4cca846 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java @@ -15,14 +15,12 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; public class HeartbeatResponse extends AbstractResponse { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id); private static final String ERROR_CODE_KEY_NAME = "error_code"; /** @@ -35,17 +33,13 @@ public class HeartbeatResponse extends AbstractResponse { * REBALANCE_IN_PROGRESS (27) * GROUP_AUTHORIZATION_FAILED (30) */ - private final Errors error; public HeartbeatResponse(Errors error) { - super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, error.code()); this.error = error; } public HeartbeatResponse(Struct struct) { - super(struct); error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); } @@ -53,7 +47,14 @@ public class HeartbeatResponse extends AbstractResponse { return error; } - public static HeartbeatResponse parse(ByteBuffer buffer) { - return new HeartbeatResponse(CURRENT_SCHEMA.read(buffer)); + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.HEARTBEAT.id, version)); + struct.set(ERROR_CODE_KEY_NAME, error.code()); + return struct; + } + + public static HeartbeatResponse parse(ByteBuffer buffer, short version) { + return new HeartbeatResponse(ProtoUtils.parseResponse(ApiKeys.HEARTBEAT.id, version, buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index ad0cdd0..37906a6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -85,8 +85,7 @@ public class JoinGroupRequest extends AbstractRequest { } @Override - public JoinGroupRequest build() { - short version = version(); + public JoinGroupRequest build(short version) { if (version < 1) { rebalanceTimeout = -1; } @@ -112,23 +111,7 @@ public class JoinGroupRequest extends AbstractRequest { private JoinGroupRequest(short version, String groupId, int sessionTimeout, int rebalanceTimeout, String memberId, String protocolType, List<ProtocolMetadata> groupProtocols) { - super(new Struct(ProtoUtils. - requestSchema(ApiKeys.JOIN_GROUP.id, version)), version); - struct.set(GROUP_ID_KEY_NAME, groupId); - struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout); - if (version >= 1) { - struct.set(REBALANCE_TIMEOUT_KEY_NAME, rebalanceTimeout); - } - struct.set(MEMBER_ID_KEY_NAME, memberId); - struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType); - List<Struct> groupProtocolsList = new ArrayList<>(groupProtocols.size()); - for (ProtocolMetadata protocol : groupProtocols) { - Struct protocolStruct = struct.instance(GROUP_PROTOCOLS_KEY_NAME); - protocolStruct.set(PROTOCOL_NAME_KEY_NAME, protocol.name); - protocolStruct.set(PROTOCOL_METADATA_KEY_NAME, protocol.metadata); - groupProtocolsList.add(protocolStruct); - } - struct.set(GROUP_PROTOCOLS_KEY_NAME, groupProtocolsList.toArray()); + super(version); this.groupId = groupId; this.sessionTimeout = sessionTimeout; this.rebalanceTimeout = rebalanceTimeout; @@ -138,7 +121,7 @@ public class JoinGroupRequest extends AbstractRequest { } public JoinGroupRequest(Struct struct, short versionId) { - super(struct, versionId); + super(versionId); groupId = struct.getString(GROUP_ID_KEY_NAME); sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME); @@ -169,7 +152,6 @@ public class JoinGroupRequest extends AbstractRequest { case 0: case 1: return new JoinGroupResponse( - versionId, Errors.forException(e), JoinGroupResponse.UNKNOWN_GENERATION_ID, JoinGroupResponse.UNKNOWN_PROTOCOL, @@ -207,12 +189,29 @@ public class JoinGroupRequest extends AbstractRequest { return protocolType; } - public static JoinGroupRequest parse(ByteBuffer buffer, int versionId) { - return new JoinGroupRequest(ProtoUtils.parseRequest(ApiKeys.JOIN_GROUP.id, versionId, buffer), - (short) versionId); + public static JoinGroupRequest parse(ByteBuffer buffer, short version) { + return new JoinGroupRequest(ProtoUtils.parseRequest(ApiKeys.JOIN_GROUP.id, version, buffer), version); } - public static JoinGroupRequest parse(ByteBuffer buffer) { - return parse(buffer, ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id)); + @Override + protected Struct toStruct() { + short version = version(); + Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.JOIN_GROUP.id, version)); + struct.set(GROUP_ID_KEY_NAME, groupId); + struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout); + if (version >= 1) { + struct.set(REBALANCE_TIMEOUT_KEY_NAME, rebalanceTimeout); + } + struct.set(MEMBER_ID_KEY_NAME, memberId); + struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType); + List<Struct> groupProtocolsList = new ArrayList<>(groupProtocols.size()); + for (ProtocolMetadata protocol : groupProtocols) { + Struct protocolStruct = struct.instance(GROUP_PROTOCOLS_KEY_NAME); + protocolStruct.set(PROTOCOL_NAME_KEY_NAME, protocol.name); + protocolStruct.set(PROTOCOL_METADATA_KEY_NAME, protocol.metadata); + groupProtocolsList.add(protocolStruct); + } + struct.set(GROUP_PROTOCOLS_KEY_NAME, groupProtocolsList.toArray()); + return struct; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index bc9366a..d2a323b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -15,7 +15,6 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -26,8 +25,6 @@ import java.util.Map; public class JoinGroupResponse extends AbstractResponse { - private static final short CURRENT_VERSION = ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id); - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id); private static final String ERROR_CODE_KEY_NAME = "error_code"; /** @@ -67,33 +64,6 @@ public class JoinGroupResponse extends AbstractResponse { String memberId, String leaderId, Map<String, ByteBuffer> groupMembers) { - this(CURRENT_VERSION, error, generationId, groupProtocol, memberId, leaderId, groupMembers); - } - - public JoinGroupResponse(int version, - Errors error, - int generationId, - String groupProtocol, - String memberId, - String leaderId, - Map<String, ByteBuffer> groupMembers) { - super(new Struct(ProtoUtils.responseSchema(ApiKeys.JOIN_GROUP.id, version))); - - struct.set(ERROR_CODE_KEY_NAME, error.code()); - struct.set(GENERATION_ID_KEY_NAME, generationId); - struct.set(GROUP_PROTOCOL_KEY_NAME, groupProtocol); - struct.set(MEMBER_ID_KEY_NAME, memberId); - struct.set(LEADER_ID_KEY_NAME, leaderId); - - List<Struct> memberArray = new ArrayList<>(); - for (Map.Entry<String, ByteBuffer> entries: groupMembers.entrySet()) { - Struct memberData = struct.instance(MEMBERS_KEY_NAME); - memberData.set(MEMBER_ID_KEY_NAME, entries.getKey()); - memberData.set(MEMBER_METADATA_KEY_NAME, entries.getValue()); - memberArray.add(memberData); - } - struct.set(MEMBERS_KEY_NAME, memberArray.toArray()); - this.error = error; this.generationId = generationId; this.groupProtocol = groupProtocol; @@ -103,7 +73,6 @@ public class JoinGroupResponse extends AbstractResponse { } public JoinGroupResponse(Struct struct) { - super(struct); members = new HashMap<>(); for (Object memberDataObj : struct.getArray(MEMBERS_KEY_NAME)) { @@ -147,11 +116,29 @@ public class JoinGroupResponse extends AbstractResponse { return members; } - public static JoinGroupResponse parse(ByteBuffer buffer, int version) { - return new JoinGroupResponse(ProtoUtils.responseSchema(ApiKeys.JOIN_GROUP.id, version).read(buffer)); + public static JoinGroupResponse parse(ByteBuffer buffer, short version) { + return new JoinGroupResponse(ProtoUtils.parseResponse(ApiKeys.JOIN_GROUP.id, version, buffer)); } - public static JoinGroupResponse parse(ByteBuffer buffer) { - return new JoinGroupResponse(CURRENT_SCHEMA.read(buffer)); + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.JOIN_GROUP.id, version)); + + struct.set(ERROR_CODE_KEY_NAME, error.code()); + struct.set(GENERATION_ID_KEY_NAME, generationId); + struct.set(GROUP_PROTOCOL_KEY_NAME, groupProtocol); + struct.set(MEMBER_ID_KEY_NAME, memberId); + struct.set(LEADER_ID_KEY_NAME, leaderId); + + List<Struct> memberArray = new ArrayList<>(); + for (Map.Entry<String, ByteBuffer> entries : members.entrySet()) { + Struct memberData = struct.instance(MEMBERS_KEY_NAME); + memberData.set(MEMBER_ID_KEY_NAME, entries.getKey()); + memberData.set(MEMBER_METADATA_KEY_NAME, entries.getValue()); + memberArray.add(memberData); + } + struct.set(MEMBERS_KEY_NAME, memberArray.toArray()); + + return struct; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index fde184a..c564b43 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -69,9 +69,8 @@ public class LeaderAndIsrRequest extends AbstractRequest { } @Override - public LeaderAndIsrRequest build() { - return new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates, - liveLeaders, version()); + public LeaderAndIsrRequest build(short version) { + return new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates, liveLeaders, version); } @Override @@ -94,46 +93,15 @@ public class LeaderAndIsrRequest extends AbstractRequest { private LeaderAndIsrRequest(int controllerId, int controllerEpoch, Map<TopicPartition, PartitionState> partitionStates, Set<Node> liveLeaders, short version) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.LEADER_AND_ISR.id, version)), - version); - struct.set(CONTROLLER_ID_KEY_NAME, controllerId); - struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch); - - List<Struct> partitionStatesData = new ArrayList<>(partitionStates.size()); - for (Map.Entry<TopicPartition, PartitionState> entry : partitionStates.entrySet()) { - Struct partitionStateData = struct.instance(PARTITION_STATES_KEY_NAME); - TopicPartition topicPartition = entry.getKey(); - partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic()); - partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition()); - PartitionState partitionState = entry.getValue(); - partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.controllerEpoch); - partitionStateData.set(LEADER_KEY_NAME, partitionState.leader); - partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.leaderEpoch); - partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray()); - partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion); - partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray()); - partitionStatesData.add(partitionStateData); - } - struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray()); - - List<Struct> leadersData = new ArrayList<>(liveLeaders.size()); - for (Node leader : liveLeaders) { - Struct leaderData = struct.instance(LIVE_LEADERS_KEY_NAME); - leaderData.set(END_POINT_ID_KEY_NAME, leader.id()); - leaderData.set(HOST_KEY_NAME, leader.host()); - leaderData.set(PORT_KEY_NAME, leader.port()); - leadersData.add(leaderData); - } - struct.set(LIVE_LEADERS_KEY_NAME, leadersData.toArray()); - + super(version); this.controllerId = controllerId; this.controllerEpoch = controllerEpoch; this.partitionStates = partitionStates; this.liveLeaders = liveLeaders; } - public LeaderAndIsrRequest(Struct struct, short versionId) { - super(struct, versionId); + public LeaderAndIsrRequest(Struct struct, short version) { + super(version); Map<TopicPartition, PartitionState> partitionStates = new HashMap<>(); for (Object partitionStateDataObj : struct.getArray(PARTITION_STATES_KEY_NAME)) { @@ -177,6 +145,42 @@ public class LeaderAndIsrRequest extends AbstractRequest { } @Override + protected Struct toStruct() { + short version = version(); + Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.LEADER_AND_ISR.id, version)); + struct.set(CONTROLLER_ID_KEY_NAME, controllerId); + struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch); + + List<Struct> partitionStatesData = new ArrayList<>(partitionStates.size()); + for (Map.Entry<TopicPartition, PartitionState> entry : partitionStates.entrySet()) { + Struct partitionStateData = struct.instance(PARTITION_STATES_KEY_NAME); + TopicPartition topicPartition = entry.getKey(); + partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic()); + partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition()); + PartitionState partitionState = entry.getValue(); + partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.controllerEpoch); + partitionStateData.set(LEADER_KEY_NAME, partitionState.leader); + partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.leaderEpoch); + partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray()); + partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion); + partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray()); + partitionStatesData.add(partitionStateData); + } + struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray()); + + List<Struct> leadersData = new ArrayList<>(liveLeaders.size()); + for (Node leader : liveLeaders) { + Struct leaderData = struct.instance(LIVE_LEADERS_KEY_NAME); + leaderData.set(END_POINT_ID_KEY_NAME, leader.id()); + leaderData.set(HOST_KEY_NAME, leader.host()); + leaderData.set(PORT_KEY_NAME, leader.port()); + leadersData.add(leaderData); + } + struct.set(LIVE_LEADERS_KEY_NAME, leadersData.toArray()); + return struct; + } + + @Override public AbstractResponse getErrorResponse(Throwable e) { Map<TopicPartition, Errors> responses = new HashMap<>(partitionStates.size()); for (TopicPartition partition : partitionStates.keySet()) { @@ -209,12 +213,8 @@ public class LeaderAndIsrRequest extends AbstractRequest { return liveLeaders; } - public static LeaderAndIsrRequest parse(ByteBuffer buffer, int versionId) { - return new LeaderAndIsrRequest(ProtoUtils.parseRequest(ApiKeys.LEADER_AND_ISR.id, versionId, buffer), - (short) versionId); + public static LeaderAndIsrRequest parse(ByteBuffer buffer, short versionId) { + return new LeaderAndIsrRequest(ProtoUtils.parseRequest(ApiKeys.LEADER_AND_ISR.id, versionId, buffer), versionId); } - public static LeaderAndIsrRequest parse(ByteBuffer buffer) { - return parse(buffer, ProtoUtils.latestVersion(ApiKeys.LEADER_AND_ISR.id)); - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java index 4d0a05d..2b02daf 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java @@ -17,7 +17,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -27,7 +26,6 @@ import java.util.List; import java.util.Map; public class LeaderAndIsrResponse extends AbstractResponse { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LEADER_AND_ISR.id); private static final String ERROR_CODE_KEY_NAME = "error_code"; private static final String PARTITIONS_KEY_NAME = "partitions"; @@ -45,33 +43,12 @@ public class LeaderAndIsrResponse extends AbstractResponse { private final Map<TopicPartition, Errors> responses; - public LeaderAndIsrResponse(Map<TopicPartition, Errors> responses) { - this(Errors.NONE, responses); - } - public LeaderAndIsrResponse(Errors error, Map<TopicPartition, Errors> responses) { - super(new Struct(CURRENT_SCHEMA)); - - List<Struct> responseDatas = new ArrayList<>(responses.size()); - for (Map.Entry<TopicPartition, Errors> response : responses.entrySet()) { - Struct partitionData = struct.instance(PARTITIONS_KEY_NAME); - TopicPartition partition = response.getKey(); - partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic()); - partitionData.set(PARTITIONS_PARTITION_KEY_NAME, partition.partition()); - partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue().code()); - responseDatas.add(partitionData); - } - - struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray()); - struct.set(ERROR_CODE_KEY_NAME, error.code()); - this.responses = responses; this.error = error; } public LeaderAndIsrResponse(Struct struct) { - super(struct); - responses = new HashMap<>(); for (Object responseDataObj : struct.getArray(PARTITIONS_KEY_NAME)) { Struct responseData = (Struct) responseDataObj; @@ -92,12 +69,27 @@ public class LeaderAndIsrResponse extends AbstractResponse { return error; } - public static LeaderAndIsrResponse parse(ByteBuffer buffer, int version) { + public static LeaderAndIsrResponse parse(ByteBuffer buffer, short version) { return new LeaderAndIsrResponse(ProtoUtils.parseResponse(ApiKeys.LEADER_AND_ISR.id, version, buffer)); } - public static LeaderAndIsrResponse parse(ByteBuffer buffer) { - return new LeaderAndIsrResponse(CURRENT_SCHEMA.read(buffer)); - } + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.LEADER_AND_ISR.id, version)); + + List<Struct> responseDatas = new ArrayList<>(responses.size()); + for (Map.Entry<TopicPartition, Errors> response : responses.entrySet()) { + Struct partitionData = struct.instance(PARTITIONS_KEY_NAME); + TopicPartition partition = response.getKey(); + partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic()); + partitionData.set(PARTITIONS_PARTITION_KEY_NAME, partition.partition()); + partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue().code()); + responseDatas.add(partitionData); + } + struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray()); + struct.set(ERROR_CODE_KEY_NAME, error.code()); + + return struct; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java index 2a7b70e..16622e4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java @@ -33,8 +33,8 @@ public class LeaveGroupRequest extends AbstractRequest { } @Override - public LeaveGroupRequest build() { - return new LeaveGroupRequest(groupId, memberId, version()); + public LeaveGroupRequest build(short version) { + return new LeaveGroupRequest(groupId, memberId, version); } @Override @@ -52,16 +52,13 @@ public class LeaveGroupRequest extends AbstractRequest { private final String memberId; private LeaveGroupRequest(String groupId, String memberId, short version) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.LEAVE_GROUP.id, version)), - version); - struct.set(GROUP_ID_KEY_NAME, groupId); - struct.set(MEMBER_ID_KEY_NAME, memberId); + super(version); this.groupId = groupId; this.memberId = memberId; } public LeaveGroupRequest(Struct struct, short version) { - super(struct, version); + super(version); groupId = struct.getString(GROUP_ID_KEY_NAME); memberId = struct.getString(MEMBER_ID_KEY_NAME); } @@ -86,12 +83,15 @@ public class LeaveGroupRequest extends AbstractRequest { return memberId; } - public static LeaveGroupRequest parse(ByteBuffer buffer, int versionId) { - return new LeaveGroupRequest(ProtoUtils.parseRequest(ApiKeys.LEAVE_GROUP.id, versionId, buffer), - (short) versionId); + public static LeaveGroupRequest parse(ByteBuffer buffer, short versionId) { + return new LeaveGroupRequest(ProtoUtils.parseRequest(ApiKeys.LEAVE_GROUP.id, versionId, buffer), versionId); } - public static LeaveGroupRequest parse(ByteBuffer buffer) { - return parse(buffer, ProtoUtils.latestVersion(ApiKeys.LEAVE_GROUP.id)); + @Override + protected Struct toStruct() { + Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.LEAVE_GROUP.id, version())); + struct.set(GROUP_ID_KEY_NAME, groupId); + struct.set(MEMBER_ID_KEY_NAME, memberId); + return struct; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java index bd1c84d..a28816a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java @@ -12,16 +12,15 @@ */ package org.apache.kafka.common.requests; -import java.nio.ByteBuffer; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; +import java.nio.ByteBuffer; + public class LeaveGroupResponse extends AbstractResponse { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LEAVE_GROUP.id); private static final String ERROR_CODE_KEY_NAME = "error_code"; /** @@ -33,16 +32,13 @@ public class LeaveGroupResponse extends AbstractResponse { * UNKNOWN_CONSUMER_ID (25) * GROUP_AUTHORIZATION_FAILED (30) */ - private final Errors error; + public LeaveGroupResponse(Errors error) { - super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, error.code()); this.error = error; } public LeaveGroupResponse(Struct struct) { - super(struct); error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); } @@ -50,7 +46,15 @@ public class LeaveGroupResponse extends AbstractResponse { return error; } - public static LeaveGroupResponse parse(ByteBuffer buffer) { - return new LeaveGroupResponse(CURRENT_SCHEMA.read(buffer)); + @Override + public Struct toStruct(short version) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.LEAVE_GROUP.id, version)); + struct.set(ERROR_CODE_KEY_NAME, error.code()); + return struct; } + + public static LeaveGroupResponse parse(ByteBuffer buffer, short versionId) { + return new LeaveGroupResponse(ProtoUtils.parseResponse(ApiKeys.LEAVE_GROUP.id, versionId, buffer)); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java index 235f4e4..badb527 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java @@ -27,8 +27,8 @@ public class ListGroupsRequest extends AbstractRequest { } @Override - public ListGroupsRequest build() { - return new ListGroupsRequest(version()); + public ListGroupsRequest build(short version) { + return new ListGroupsRequest(version); } @Override @@ -38,12 +38,11 @@ public class ListGroupsRequest extends AbstractRequest { } public ListGroupsRequest(short version) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.LIST_GROUPS.id, version)), - version); + super(version); } public ListGroupsRequest(Struct struct, short versionId) { - super(struct, versionId); + super(versionId); } @Override @@ -58,12 +57,12 @@ public class ListGroupsRequest extends AbstractRequest { } } - public static ListGroupsRequest parse(ByteBuffer buffer, int versionId) { - return new ListGroupsRequest(ProtoUtils.parseRequest(ApiKeys.LIST_GROUPS.id, versionId, buffer), - (short) versionId); + public static ListGroupsRequest parse(ByteBuffer buffer, short versionId) { + return new ListGroupsRequest(ProtoUtils.parseRequest(ApiKeys.LIST_GROUPS.id, versionId, buffer), versionId); } - public static ListGroupsRequest parse(ByteBuffer buffer) { - return parse(buffer, ProtoUtils.latestVersion(ApiKeys.LIST_GROUPS.id)); + @Override + protected Struct toStruct() { + return new Struct(ProtoUtils.requestSchema(ApiKeys.LIST_GROUPS.id, version())); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java index f421064..e05a4b1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java @@ -15,7 +15,6 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -25,8 +24,6 @@ import java.util.List; public class ListGroupsResponse extends AbstractResponse { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_GROUPS.id); - public static final String ERROR_CODE_KEY_NAME = "error_code"; public static final String GROUPS_KEY_NAME = "groups"; public static final String GROUP_ID_KEY_NAME = "group_id"; @@ -43,22 +40,11 @@ public class ListGroupsResponse extends AbstractResponse { private final List<Group> groups; public ListGroupsResponse(Errors error, List<Group> groups) { - super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, error.code()); - List<Struct> groupList = new ArrayList<>(); - for (Group group : groups) { - Struct groupStruct = struct.instance(GROUPS_KEY_NAME); - groupStruct.set(GROUP_ID_KEY_NAME, group.groupId); - groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType); - groupList.add(groupStruct); - } - struct.set(GROUPS_KEY_NAME, groupList.toArray()); this.error = error; this.groups = groups; } public ListGroupsResponse(Struct struct) { - super(struct); this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); this.groups = new ArrayList<>(); for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) { @@ -96,12 +82,27 @@ public class ListGroupsResponse extends AbstractResponse { } - public static ListGroupsResponse parse(ByteBuffer buffer) { - return new ListGroupsResponse(CURRENT_SCHEMA.read(buffer)); + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.LIST_GROUPS.id, version)); + struct.set(ERROR_CODE_KEY_NAME, error.code()); + List<Struct> groupList = new ArrayList<>(); + for (Group group : groups) { + Struct groupStruct = struct.instance(GROUPS_KEY_NAME); + groupStruct.set(GROUP_ID_KEY_NAME, group.groupId); + groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType); + groupList.add(groupStruct); + } + struct.set(GROUPS_KEY_NAME, groupList.toArray()); + return struct; } public static ListGroupsResponse fromError(Errors error) { return new ListGroupsResponse(error, Collections.<Group>emptyList()); } + public static ListGroupsResponse parse(ByteBuffer buffer, short versionId) { + return new ListGroupsResponse(ProtoUtils.parseResponse(ApiKeys.LIST_GROUPS.id, versionId, buffer)); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 6214a56..3e2ad7c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -60,16 +60,21 @@ public class ListOffsetRequest extends AbstractRequest { public static class Builder extends AbstractRequest.Builder<ListOffsetRequest> { private final int replicaId; + private final short minVersion; private Map<TopicPartition, PartitionData> offsetData = null; private Map<TopicPartition, Long> partitionTimestamps = null; - private short minVersion = (short) 0; - public Builder() { - this(CONSUMER_REPLICA_ID); + public static Builder forReplica(short desiredVersion, int replicaId) { + return new Builder((short) 0, desiredVersion, replicaId); } - public Builder(int replicaId) { - super(ApiKeys.LIST_OFFSETS); + public static Builder forConsumer(short minVersion) { + return new Builder(minVersion, null, CONSUMER_REPLICA_ID); + } + + private Builder(short minVersion, Short desiredVersion, int replicaId) { + super(ApiKeys.LIST_OFFSETS, desiredVersion); + this.minVersion = minVersion; this.replicaId = replicaId; } @@ -84,8 +89,7 @@ public class ListOffsetRequest extends AbstractRequest { } @Override - public ListOffsetRequest build() { - short version = version(); + public ListOffsetRequest build(short version) { if (version < minVersion) { throw new UnsupportedVersionException("Cannot create a v" + version + " ListOffsetRequest because " + "we require features supported only in " + minVersion + " or later."); @@ -117,14 +121,6 @@ public class ListOffsetRequest extends AbstractRequest { return new ListOffsetRequest(replicaId, m, version); } - /** - * Set the minimum version which we will produce for this request. - */ - public Builder setMinVersion(short minVersion) { - this.minVersion = minVersion; - return this; - } - @Override public String toString() { StringBuilder bld = new StringBuilder(); @@ -170,44 +166,15 @@ public class ListOffsetRequest extends AbstractRequest { */ @SuppressWarnings("unchecked") private ListOffsetRequest(int replicaId, Map<TopicPartition, ?> targetTimes, short version) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.LIST_OFFSETS.id, version)), version); - Map<String, Map<Integer, Object>> topicsData = - CollectionUtils.groupDataByTopic((Map<TopicPartition, Object>) targetTimes); - - struct.set(REPLICA_ID_KEY_NAME, replicaId); - List<Struct> topicArray = new ArrayList<Struct>(); - for (Map.Entry<String, Map<Integer, Object>> topicEntry: topicsData.entrySet()) { - Struct topicData = struct.instance(TOPICS_KEY_NAME); - topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); - List<Struct> partitionArray = new ArrayList<>(); - for (Map.Entry<Integer, Object> partitionEntry : topicEntry.getValue().entrySet()) { - if (version == 0) { - PartitionData offsetPartitionData = (PartitionData) partitionEntry.getValue(); - Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); - partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); - partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp); - partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, offsetPartitionData.maxNumOffsets); - partitionArray.add(partitionData); - } else { - Long timestamp = (Long) partitionEntry.getValue(); - Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); - partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); - partitionData.set(TIMESTAMP_KEY_NAME, timestamp); - partitionArray.add(partitionData); - } - } - topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); - topicArray.add(topicData); - } - struct.set(TOPICS_KEY_NAME, topicArray.toArray()); + super(version); this.replicaId = replicaId; this.offsetData = version == 0 ? (Map<TopicPartition, PartitionData>) targetTimes : null; this.partitionTimestamps = version == 1 ? (Map<TopicPartition, Long>) targetTimes : null; this.duplicatePartitions = Collections.emptySet(); } - public ListOffsetRequest(Struct struct, short versionId) { - super(struct, versionId); + public ListOffsetRequest(Struct struct, short version) { + super(version); Set<TopicPartition> duplicatePartitions = new HashSet<>(); replicaId = struct.getInt(REPLICA_ID_KEY_NAME); offsetData = new HashMap<>(); @@ -236,17 +203,19 @@ public class ListOffsetRequest extends AbstractRequest { @Override @SuppressWarnings("deprecation") public AbstractResponse getErrorResponse(Throwable e) { - Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>(); + Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>(); short versionId = version(); if (versionId == 0) { for (Map.Entry<TopicPartition, PartitionData> entry : offsetData.entrySet()) { - ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e), new ArrayList<Long>()); + ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData( + Errors.forException(e), Collections.<Long>emptyList()); responseData.put(entry.getKey(), partitionResponse); } } else { for (Map.Entry<TopicPartition, Long> entry : partitionTimestamps.entrySet()) { - ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e), -1L, -1L); + ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData( + Errors.forException(e), -1L, -1L); responseData.put(entry.getKey(), partitionResponse); } } @@ -254,7 +223,7 @@ public class ListOffsetRequest extends AbstractRequest { switch (versionId) { case 0: case 1: - return new ListOffsetResponse(responseData, versionId); + return new ListOffsetResponse(responseData); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_OFFSETS.id))); @@ -278,12 +247,44 @@ public class ListOffsetRequest extends AbstractRequest { return duplicatePartitions; } - public static ListOffsetRequest parse(ByteBuffer buffer, int versionId) { - return new ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, buffer), - (short) versionId); + public static ListOffsetRequest parse(ByteBuffer buffer, short versionId) { + return new ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, buffer), versionId); } - public static ListOffsetRequest parse(ByteBuffer buffer) { - return parse(buffer, ProtoUtils.latestVersion(ApiKeys.LIST_OFFSETS.id)); + @Override + protected Struct toStruct() { + short version = version(); + Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.LIST_OFFSETS.id, version)); + + Map<TopicPartition, ?> targetTimes = partitionTimestamps == null ? offsetData : partitionTimestamps; + Map<String, Map<Integer, Object>> topicsData = CollectionUtils.groupDataByTopic(targetTimes); + + struct.set(REPLICA_ID_KEY_NAME, replicaId); + List<Struct> topicArray = new ArrayList<>(); + for (Map.Entry<String, Map<Integer, Object>> topicEntry: topicsData.entrySet()) { + Struct topicData = struct.instance(TOPICS_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); + List<Struct> partitionArray = new ArrayList<>(); + for (Map.Entry<Integer, Object> partitionEntry : topicEntry.getValue().entrySet()) { + if (version == 0) { + PartitionData offsetPartitionData = (PartitionData) partitionEntry.getValue(); + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp); + partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, offsetPartitionData.maxNumOffsets); + partitionArray.add(partitionData); + } else { + Long timestamp = (Long) partitionEntry.getValue(); + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(TIMESTAMP_KEY_NAME, timestamp); + partitionArray.add(partitionData); + } + } + topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(TOPICS_KEY_NAME, topicArray.toArray()); + return struct; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java index b815a53..cb3bafc 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; import org.apache.kafka.common.utils.Utils; @@ -35,7 +34,6 @@ public class ListOffsetResponse extends AbstractResponse { public static final long UNKNOWN_TIMESTAMP = -1L; public static final long UNKNOWN_OFFSET = -1L; - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id); private static final String RESPONSES_KEY_NAME = "responses"; // topic level field names @@ -61,8 +59,6 @@ public class ListOffsetResponse extends AbstractResponse { private static final String TIMESTAMP_KEY_NAME = "timestamp"; private static final String OFFSET_KEY_NAME = "offset"; - private final Map<TopicPartition, PartitionData> responseData; - public static final class PartitionData { public final Errors error; // The offsets list is only used in ListOffsetResponse v0. @@ -110,46 +106,17 @@ public class ListOffsetResponse extends AbstractResponse { } } + private final Map<TopicPartition, PartitionData> responseData; + /** - * Constructor for ListOffsetResponse v0. + * Constructor for all versions. */ - @Deprecated public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) { - this(responseData, 0); - } - - public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData, int version) { - super(new Struct(ProtoUtils.responseSchema(ApiKeys.LIST_OFFSETS.id, version))); - Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData); - - List<Struct> topicArray = new ArrayList<Struct>(); - for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) { - Struct topicData = struct.instance(RESPONSES_KEY_NAME); - topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); - List<Struct> partitionArray = new ArrayList<Struct>(); - for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) { - PartitionData offsetPartitionData = partitionEntry.getValue(); - Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); - partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); - partitionData.set(ERROR_CODE_KEY_NAME, offsetPartitionData.error.code()); - if (version == 0) - partitionData.set(OFFSETS_KEY_NAME, offsetPartitionData.offsets.toArray()); - else { - partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp); - partitionData.set(OFFSET_KEY_NAME, offsetPartitionData.offset); - } - partitionArray.add(partitionData); - } - topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); - topicArray.add(topicData); - } - struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); this.responseData = responseData; } public ListOffsetResponse(Struct struct) { - super(struct); - responseData = new HashMap<TopicPartition, PartitionData>(); + responseData = new HashMap<>(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; String topic = topicResponse.getString(TOPIC_KEY_NAME); @@ -178,11 +145,38 @@ public class ListOffsetResponse extends AbstractResponse { return responseData; } - public static ListOffsetResponse parse(ByteBuffer buffer) { - return new ListOffsetResponse(CURRENT_SCHEMA.read(buffer)); + public static ListOffsetResponse parse(ByteBuffer buffer, short version) { + return new ListOffsetResponse(ProtoUtils.parseResponse(ApiKeys.LIST_OFFSETS.id, version, buffer)); } - public static ListOffsetResponse parse(ByteBuffer buffer, int version) { - return new ListOffsetResponse(ProtoUtils.responseSchema(ApiKeys.LIST_OFFSETS.id, version).read(buffer)); + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.LIST_OFFSETS.id, version)); + Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData); + + List<Struct> topicArray = new ArrayList<>(); + for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) { + Struct topicData = struct.instance(RESPONSES_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); + List<Struct> partitionArray = new ArrayList<>(); + for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) { + PartitionData offsetPartitionData = partitionEntry.getValue(); + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(ERROR_CODE_KEY_NAME, offsetPartitionData.error.code()); + if (version == 0) + partitionData.set(OFFSETS_KEY_NAME, offsetPartitionData.offsets.toArray()); + else { + partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp); + partitionData.set(OFFSET_KEY_NAME, offsetPartitionData.offset); + } + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); + + return struct; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index 16af1b7..f31315f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -51,8 +51,7 @@ public class MetadataRequest extends AbstractRequest { } @Override - public MetadataRequest build() { - short version = version(); + public MetadataRequest build(short version) { if (version < 1) { throw new UnsupportedVersionException("MetadataRequest " + "versions older than 1 are not supported."); @@ -79,27 +78,18 @@ public class MetadataRequest extends AbstractRequest { private final List<String> topics; - public static MetadataRequest allTopics(short version) { - return new MetadataRequest.Builder(null).setVersion(version).build(); - } - /** * In v0 null is not allowed and and empty list indicates requesting all topics. * Note: modern clients do not support sending v0 requests. * In v1 null indicates requesting all topics, and an empty list indicates requesting no topics. */ public MetadataRequest(List<String> topics, short version) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.METADATA.id, version)), - version); - if (topics == null) - struct.set(TOPICS_KEY_NAME, null); - else - struct.set(TOPICS_KEY_NAME, topics.toArray()); + super(version); this.topics = topics; } public MetadataRequest(Struct struct, short version) { - super(struct, version); + super(version); Object[] topicArray = struct.getArray(TOPICS_KEY_NAME); if (topicArray != null) { topics = new ArrayList<>(); @@ -127,7 +117,7 @@ public class MetadataRequest extends AbstractRequest { case 0: case 1: case 2: - return new MetadataResponse(Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas, versionId); + return new MetadataResponse(Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.METADATA.id))); @@ -142,12 +132,17 @@ public class MetadataRequest extends AbstractRequest { return topics; } - public static MetadataRequest parse(ByteBuffer buffer, int versionId) { - return new MetadataRequest(ProtoUtils.parseRequest(ApiKeys.METADATA.id, versionId, buffer), - (short) versionId); + public static MetadataRequest parse(ByteBuffer buffer, short versionId) { + return new MetadataRequest(ProtoUtils.parseRequest(ApiKeys.METADATA.id, versionId, buffer), versionId); } - public static MetadataRequest parse(ByteBuffer buffer) { - return parse(buffer, ProtoUtils.latestVersion(ApiKeys.METADATA.id)); + @Override + protected Struct toStruct() { + Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.METADATA.id, version())); + if (topics == null) + struct.set(TOPICS_KEY_NAME, null); + else + struct.set(TOPICS_KEY_NAME, topics.toArray()); + return struct; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index a8baee5..268bf84 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -31,7 +31,6 @@ import java.util.Set; public class MetadataResponse extends AbstractResponse { - private static final short CURRENT_VERSION = ProtoUtils.latestVersion(ApiKeys.METADATA.id); private static final String BROKERS_KEY_NAME = "brokers"; private static final String TOPIC_METADATA_KEY_NAME = "topic_metadata"; @@ -83,78 +82,16 @@ public class MetadataResponse extends AbstractResponse { private final String clusterId; /** - * Constructor for the latest version + * Constructor for all versions. */ public MetadataResponse(List<Node> brokers, String clusterId, int controllerId, List<TopicMetadata> topicMetadata) { - this(brokers, clusterId, controllerId, topicMetadata, CURRENT_VERSION); - } - - /** - * Constructor for a specific version - */ - public MetadataResponse(List<Node> brokers, String clusterId, int controllerId, List<TopicMetadata> topicMetadata, int version) { - super(new Struct(ProtoUtils.responseSchema(ApiKeys.METADATA.id, version))); this.brokers = brokers; this.controller = getControllerNode(controllerId, brokers); this.topicMetadata = topicMetadata; this.clusterId = clusterId; - - List<Struct> brokerArray = new ArrayList<>(); - for (Node node : brokers) { - Struct broker = struct.instance(BROKERS_KEY_NAME); - broker.set(NODE_ID_KEY_NAME, node.id()); - broker.set(HOST_KEY_NAME, node.host()); - broker.set(PORT_KEY_NAME, node.port()); - // This field only exists in v1+ - if (broker.hasField(RACK_KEY_NAME)) - broker.set(RACK_KEY_NAME, node.rack()); - brokerArray.add(broker); - } - struct.set(BROKERS_KEY_NAME, brokerArray.toArray()); - - // This field only exists in v1+ - if (struct.hasField(CONTROLLER_ID_KEY_NAME)) - struct.set(CONTROLLER_ID_KEY_NAME, controllerId); - - // This field only exists in v2+ - if (struct.hasField(CLUSTER_ID_KEY_NAME)) - struct.set(CLUSTER_ID_KEY_NAME, clusterId); - - List<Struct> topicMetadataArray = new ArrayList<>(topicMetadata.size()); - for (TopicMetadata metadata : topicMetadata) { - Struct topicData = struct.instance(TOPIC_METADATA_KEY_NAME); - topicData.set(TOPIC_KEY_NAME, metadata.topic); - topicData.set(TOPIC_ERROR_CODE_KEY_NAME, metadata.error.code()); - // This field only exists in v1+ - if (topicData.hasField(IS_INTERNAL_KEY_NAME)) - topicData.set(IS_INTERNAL_KEY_NAME, metadata.isInternal()); - - List<Struct> partitionMetadataArray = new ArrayList<>(metadata.partitionMetadata.size()); - for (PartitionMetadata partitionMetadata : metadata.partitionMetadata()) { - Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME); - partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, partitionMetadata.error.code()); - partitionData.set(PARTITION_KEY_NAME, partitionMetadata.partition); - partitionData.set(LEADER_KEY_NAME, partitionMetadata.leader.id()); - ArrayList<Integer> replicas = new ArrayList<>(partitionMetadata.replicas.size()); - for (Node node : partitionMetadata.replicas) - replicas.add(node.id()); - partitionData.set(REPLICAS_KEY_NAME, replicas.toArray()); - ArrayList<Integer> isr = new ArrayList<>(partitionMetadata.isr.size()); - for (Node node : partitionMetadata.isr) - isr.add(node.id()); - partitionData.set(ISR_KEY_NAME, isr.toArray()); - partitionMetadataArray.add(partitionData); - - } - topicData.set(PARTITION_METADATA_KEY_NAME, partitionMetadataArray.toArray()); - topicMetadataArray.add(topicData); - } - struct.set(TOPIC_METADATA_KEY_NAME, topicMetadataArray.toArray()); } public MetadataResponse(Struct struct) { - super(struct); - Map<Integer, Node> brokers = new HashMap<>(); Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME); for (int i = 0; i < brokerStructs.length; i++) { @@ -317,12 +254,8 @@ public class MetadataResponse extends AbstractResponse { return this.clusterId; } - public static MetadataResponse parse(ByteBuffer buffer) { - return parse(buffer, CURRENT_VERSION); - } - - public static MetadataResponse parse(ByteBuffer buffer, int version) { - return new MetadataResponse(ProtoUtils.responseSchema(ApiKeys.METADATA.id, version).read(buffer)); + public static MetadataResponse parse(ByteBuffer buffer, short version) { + return new MetadataResponse(ProtoUtils.parseResponse(ApiKeys.METADATA.id, version, buffer)); } public static class TopicMetadata { @@ -400,4 +333,60 @@ public class MetadataResponse extends AbstractResponse { } + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.METADATA.id, version)); + List<Struct> brokerArray = new ArrayList<>(); + for (Node node : brokers) { + Struct broker = struct.instance(BROKERS_KEY_NAME); + broker.set(NODE_ID_KEY_NAME, node.id()); + broker.set(HOST_KEY_NAME, node.host()); + broker.set(PORT_KEY_NAME, node.port()); + // This field only exists in v1+ + if (broker.hasField(RACK_KEY_NAME)) + broker.set(RACK_KEY_NAME, node.rack()); + brokerArray.add(broker); + } + struct.set(BROKERS_KEY_NAME, brokerArray.toArray()); + + // This field only exists in v1+ + if (struct.hasField(CONTROLLER_ID_KEY_NAME)) + struct.set(CONTROLLER_ID_KEY_NAME, controller == null ? NO_CONTROLLER_ID : controller.id()); + + // This field only exists in v2+ + if (struct.hasField(CLUSTER_ID_KEY_NAME)) + struct.set(CLUSTER_ID_KEY_NAME, clusterId); + + List<Struct> topicMetadataArray = new ArrayList<>(topicMetadata.size()); + for (TopicMetadata metadata : topicMetadata) { + Struct topicData = struct.instance(TOPIC_METADATA_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, metadata.topic); + topicData.set(TOPIC_ERROR_CODE_KEY_NAME, metadata.error.code()); + // This field only exists in v1+ + if (topicData.hasField(IS_INTERNAL_KEY_NAME)) + topicData.set(IS_INTERNAL_KEY_NAME, metadata.isInternal()); + + List<Struct> partitionMetadataArray = new ArrayList<>(metadata.partitionMetadata.size()); + for (PartitionMetadata partitionMetadata : metadata.partitionMetadata()) { + Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME); + partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, partitionMetadata.error.code()); + partitionData.set(PARTITION_KEY_NAME, partitionMetadata.partition); + partitionData.set(LEADER_KEY_NAME, partitionMetadata.leader.id()); + ArrayList<Integer> replicas = new ArrayList<>(partitionMetadata.replicas.size()); + for (Node node : partitionMetadata.replicas) + replicas.add(node.id()); + partitionData.set(REPLICAS_KEY_NAME, replicas.toArray()); + ArrayList<Integer> isr = new ArrayList<>(partitionMetadata.isr.size()); + for (Node node : partitionMetadata.isr) + isr.add(node.id()); + partitionData.set(ISR_KEY_NAME, isr.toArray()); + partitionMetadataArray.add(partitionData); + + } + topicData.set(PARTITION_METADATA_KEY_NAME, partitionMetadataArray.toArray()); + topicMetadataArray.add(topicData); + } + struct.set(TOPIC_METADATA_KEY_NAME, topicMetadataArray.toArray()); + return struct; + } }