http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 6dd1197..bf14f10 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -124,17 +124,15 @@ public class OffsetCommitRequest extends AbstractRequest { } @Override - public OffsetCommitRequest build() { - short version = version(); + public OffsetCommitRequest build(short version) { switch (version) { case 0: - return new OffsetCommitRequest(groupId, offsetData); + return new OffsetCommitRequest(groupId, DEFAULT_GENERATION_ID, DEFAULT_MEMBER_ID, + DEFAULT_RETENTION_TIME, offsetData, version); case 1: - return new OffsetCommitRequest(groupId, generationId, memberId, - offsetData); case 2: - return new OffsetCommitRequest(groupId, generationId, memberId, - retentionTime, offsetData, version); + long retentionTime = version == 1 ? DEFAULT_RETENTION_TIME : this.retentionTime; + return new OffsetCommitRequest(groupId, generationId, memberId, retentionTime, offsetData, version); default: throw new UnsupportedVersionException("Unsupported version " + version); } @@ -154,57 +152,9 @@ public class OffsetCommitRequest extends AbstractRequest { } } - /** - * Constructor for version 0. - * @param groupId - * @param offsetData - */ - private OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> offsetData) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0)), (short) 0); - - initCommonFields(groupId, offsetData); - this.groupId = groupId; - this.generationId = DEFAULT_GENERATION_ID; - this.memberId = DEFAULT_MEMBER_ID; - this.retentionTime = DEFAULT_RETENTION_TIME; - this.offsetData = offsetData; - } - - /** - * Constructor for version 1. - * @param groupId - * @param generationId - * @param memberId - * @param offsetData - */ - private OffsetCommitRequest(String groupId, int generationId, String memberId, Map<TopicPartition, PartitionData> offsetData) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 1)), (short) 1); - - initCommonFields(groupId, offsetData); - struct.set(GENERATION_ID_KEY_NAME, generationId); - struct.set(MEMBER_ID_KEY_NAME, memberId); - this.groupId = groupId; - this.generationId = generationId; - this.memberId = memberId; - this.retentionTime = DEFAULT_RETENTION_TIME; - this.offsetData = offsetData; - } - - /** - * Constructor for version 2 and above. - * @param groupId - * @param generationId - * @param memberId - * @param retentionTime - * @param offsetData - */ private OffsetCommitRequest(String groupId, int generationId, String memberId, long retentionTime, Map<TopicPartition, PartitionData> offsetData, short version) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, version)), version); - initCommonFields(groupId, offsetData); - struct.set(GENERATION_ID_KEY_NAME, generationId); - struct.set(MEMBER_ID_KEY_NAME, memberId); - struct.set(RETENTION_TIME_KEY_NAME, retentionTime); + super(version); this.groupId = groupId; this.generationId = generationId; this.memberId = memberId; @@ -212,35 +162,8 @@ public class OffsetCommitRequest extends AbstractRequest { this.offsetData = offsetData; } - private void initCommonFields(String groupId, Map<TopicPartition, PartitionData> offsetData) { - Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData); - - struct.set(GROUP_ID_KEY_NAME, groupId); - List<Struct> topicArray = new ArrayList<Struct>(); - - for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) { - Struct topicData = struct.instance(TOPICS_KEY_NAME); - topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); - List<Struct> partitionArray = new ArrayList<>(); - for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) { - PartitionData fetchPartitionData = partitionEntry.getValue(); - Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); - partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); - partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset); - // Only for v1 - if (partitionData.hasField(TIMESTAMP_KEY_NAME)) - partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp); - partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata); - partitionArray.add(partitionData); - } - topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); - topicArray.add(topicData); - } - struct.set(TOPICS_KEY_NAME, topicArray.toArray()); - } - public OffsetCommitRequest(Struct struct, short versionId) { - super(struct, versionId); + super(versionId); groupId = struct.getString(GROUP_ID_KEY_NAME); // This field only exists in v1. @@ -284,6 +207,42 @@ public class OffsetCommitRequest extends AbstractRequest { } @Override + public Struct toStruct() { + short version = version(); + Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, version)); + struct.set(GROUP_ID_KEY_NAME, groupId); + + Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData); + List<Struct> topicArray = new ArrayList<>(); + for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) { + Struct topicData = struct.instance(TOPICS_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, topicEntry.getKey()); + List<Struct> partitionArray = new ArrayList<>(); + for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) { + PartitionData fetchPartitionData = partitionEntry.getValue(); + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset); + // Only for v1 + if (partitionData.hasField(TIMESTAMP_KEY_NAME)) + partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp); + partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata); + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(TOPICS_KEY_NAME, topicArray.toArray()); + if (struct.hasField(GENERATION_ID_KEY_NAME)) + struct.set(GENERATION_ID_KEY_NAME, generationId); + if (struct.hasField(MEMBER_ID_KEY_NAME)) + struct.set(MEMBER_ID_KEY_NAME, memberId); + if (struct.hasField(RETENTION_TIME_KEY_NAME)) + struct.set(RETENTION_TIME_KEY_NAME, retentionTime); + return struct; + } + + @Override public AbstractResponse getErrorResponse(Throwable e) { Map<TopicPartition, Errors> responseData = new HashMap<>(); for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) { @@ -322,12 +281,8 @@ public class OffsetCommitRequest extends AbstractRequest { return offsetData; } - public static OffsetCommitRequest parse(ByteBuffer buffer, int versionId) { + public static OffsetCommitRequest parse(ByteBuffer buffer, short versionId) { Schema schema = ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, versionId); - return new OffsetCommitRequest(schema.read(buffer), (short) versionId); - } - - public static OffsetCommitRequest parse(ByteBuffer buffer) { - return parse(buffer, ProtoUtils.latestVersion(ApiKeys.OFFSET_COMMIT.id)); + return new OffsetCommitRequest(schema.read(buffer), versionId); } }
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index 8a00c6b..b5709e2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -16,7 +16,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; @@ -28,7 +27,6 @@ import java.util.Map; public class OffsetCommitResponse extends AbstractResponse { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id); private static final String RESPONSES_KEY_NAME = "responses"; // topic level fields @@ -58,15 +56,33 @@ public class OffsetCommitResponse extends AbstractResponse { private final Map<TopicPartition, Errors> responseData; public OffsetCommitResponse(Map<TopicPartition, Errors> responseData) { - super(new Struct(CURRENT_SCHEMA)); + this.responseData = responseData; + } - Map<String, Map<Integer, Errors>> topicsData = CollectionUtils.groupDataByTopic(responseData); + public OffsetCommitResponse(Struct struct) { + responseData = new HashMap<>(); + for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { + Struct topicResponse = (Struct) topicResponseObj; + String topic = topicResponse.getString(TOPIC_KEY_NAME); + for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { + Struct partitionResponse = (Struct) partitionResponseObj; + int partition = partitionResponse.getInt(PARTITION_KEY_NAME); + Errors error = Errors.forCode(partitionResponse.getShort(ERROR_CODE_KEY_NAME)); + responseData.put(new TopicPartition(topic, partition), error); + } + } + } + + @Override + public Struct toStruct(short version) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_COMMIT.id, version)); - List<Struct> topicArray = new ArrayList<Struct>(); + Map<String, Map<Integer, Errors>> topicsData = CollectionUtils.groupDataByTopic(responseData); + List<Struct> topicArray = new ArrayList<>(); for (Map.Entry<String, Map<Integer, Errors>> entries: topicsData.entrySet()) { Struct topicData = struct.instance(RESPONSES_KEY_NAME); topicData.set(TOPIC_KEY_NAME, entries.getKey()); - List<Struct> partitionArray = new ArrayList<Struct>(); + List<Struct> partitionArray = new ArrayList<>(); for (Map.Entry<Integer, Errors> partitionEntry : entries.getValue().entrySet()) { Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); @@ -77,34 +93,16 @@ public class OffsetCommitResponse extends AbstractResponse { topicArray.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); - this.responseData = responseData; - } - public OffsetCommitResponse(Struct struct) { - super(struct); - responseData = new HashMap<TopicPartition, Errors>(); - for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { - Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.getString(TOPIC_KEY_NAME); - for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { - Struct partitionResponse = (Struct) partitionResponseObj; - int partition = partitionResponse.getInt(PARTITION_KEY_NAME); - Errors error = Errors.forCode(partitionResponse.getShort(ERROR_CODE_KEY_NAME)); - responseData.put(new TopicPartition(topic, partition), error); - } - } + return struct; } public Map<TopicPartition, Errors> responseData() { return responseData; } - public static OffsetCommitResponse parse(ByteBuffer buffer, int version) { - Schema schema = ProtoUtils.responseSchema(ApiKeys.OFFSET_COMMIT.id, version); - return new OffsetCommitResponse(schema.read(buffer)); + public static OffsetCommitResponse parse(ByteBuffer buffer, short version) { + return new OffsetCommitResponse(ProtoUtils.parseResponse(ApiKeys.OFFSET_COMMIT.id, version, buffer)); } - public static OffsetCommitResponse parse(ByteBuffer buffer) { - return new OffsetCommitResponse(CURRENT_SCHEMA.read(buffer)); - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index 43ddf88..2a550e5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -58,11 +58,11 @@ public class OffsetFetchRequest extends AbstractRequest { } @Override - public OffsetFetchRequest build() { - if (isAllTopicPartitions() && version() < 2) + public OffsetFetchRequest build(short version) { + if (isAllTopicPartitions() && version < 2) throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " + - "v" + version() + ", but we need v2 or newer to request all topic partitions."); - return new OffsetFetchRequest(groupId, partitions, version()); + "v" + version + ", but we need v2 or newer to request all topic partitions."); + return new OffsetFetchRequest(groupId, partitions, version); } @Override @@ -80,39 +80,18 @@ public class OffsetFetchRequest extends AbstractRequest { private final List<TopicPartition> partitions; public static OffsetFetchRequest forAllPartitions(String groupId) { - return new OffsetFetchRequest.Builder(groupId, null).setVersion((short) 2).build(); + return new OffsetFetchRequest.Builder(groupId, null).build((short) 2); } // v0, v1, and v2 have the same fields. private OffsetFetchRequest(String groupId, List<TopicPartition> partitions, short version) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_FETCH.id, version)), version); - struct.set(GROUP_ID_KEY_NAME, groupId); - if (partitions != null) { - Map<String, List<Integer>> topicsData = CollectionUtils.groupDataByTopic(partitions); - - List<Struct> topicArray = new ArrayList<>(); - for (Map.Entry<String, List<Integer>> entries : topicsData.entrySet()) { - Struct topicData = struct.instance(TOPICS_KEY_NAME); - topicData.set(TOPIC_KEY_NAME, entries.getKey()); - List<Struct> partitionArray = new ArrayList<>(); - for (Integer partitionId : entries.getValue()) { - Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); - partitionData.set(PARTITION_KEY_NAME, partitionId); - partitionArray.add(partitionData); - } - topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); - topicArray.add(topicData); - } - struct.set(TOPICS_KEY_NAME, topicArray.toArray()); - } else - struct.set(TOPICS_KEY_NAME, null); - + super(version); this.groupId = groupId; this.partitions = partitions; } - public OffsetFetchRequest(Struct struct, short versionId) { - super(struct, versionId); + public OffsetFetchRequest(Struct struct, short version) { + super(version); Object[] topicArray = struct.getArray(TOPICS_KEY_NAME); if (topicArray != null) { @@ -150,7 +129,7 @@ public class OffsetFetchRequest extends AbstractRequest { case 0: case 1: case 2: - return new OffsetFetchResponse(error, responsePartitions, versionId); + return new OffsetFetchResponse(error, responsePartitions); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id))); @@ -170,16 +149,38 @@ public class OffsetFetchRequest extends AbstractRequest { return partitions; } - public static OffsetFetchRequest parse(ByteBuffer buffer, int versionId) { - return new OffsetFetchRequest(ProtoUtils.parseRequest(ApiKeys.OFFSET_FETCH.id, versionId, buffer), - (short) versionId); - } - - public static OffsetFetchRequest parse(ByteBuffer buffer) { - return parse(buffer, ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id)); + public static OffsetFetchRequest parse(ByteBuffer buffer, short versionId) { + return new OffsetFetchRequest(ProtoUtils.parseRequest(ApiKeys.OFFSET_FETCH.id, versionId, buffer), versionId); } public boolean isAllPartitions() { return partitions == null; } + + @Override + protected Struct toStruct() { + Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_FETCH.id, version())); + struct.set(GROUP_ID_KEY_NAME, groupId); + if (partitions != null) { + Map<String, List<Integer>> topicsData = CollectionUtils.groupDataByTopic(partitions); + + List<Struct> topicArray = new ArrayList<>(); + for (Map.Entry<String, List<Integer>> entries : topicsData.entrySet()) { + Struct topicData = struct.instance(TOPICS_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, entries.getKey()); + List<Struct> partitionArray = new ArrayList<>(); + for (Integer partitionId : entries.getValue()) { + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionId); + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(TOPICS_KEY_NAME, topicArray.toArray()); + } else + struct.set(TOPICS_KEY_NAME, null); + + return struct; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index 9c14155..94de4b1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -24,14 +24,11 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; public class OffsetFetchResponse extends AbstractResponse { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_FETCH.id); - private static final short CURRENT_VERSION = ProtoUtils.latestVersion(ApiKeys.OFFSET_FETCH.id); private static final String RESPONSES_KEY_NAME = "responses"; private static final String ERROR_CODE_KEY_NAME = "error_code"; @@ -85,51 +82,16 @@ public class OffsetFetchResponse extends AbstractResponse { } /** - * Constructor for the latest version. - * @param error Potential coordinator or group level error code - * @param responseData Fetched offset information grouped by topic-partition - */ - public OffsetFetchResponse(Errors error, Map<TopicPartition, PartitionData> responseData) { - this(error, responseData, CURRENT_VERSION); - } - - /** - * Unified constructor for all versions. + * Constructor for all versions. * @param error Potential coordinator or group level error code (for api version 2 and later) * @param responseData Fetched offset information grouped by topic-partition - * @param version The request API version */ - public OffsetFetchResponse(Errors error, Map<TopicPartition, PartitionData> responseData, int version) { - super(new Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, version))); - - Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData); - List<Struct> topicArray = new ArrayList<>(); - for (Map.Entry<String, Map<Integer, PartitionData>> entries : topicsData.entrySet()) { - Struct topicData = this.struct.instance(RESPONSES_KEY_NAME); - topicData.set(TOPIC_KEY_NAME, entries.getKey()); - List<Struct> partitionArray = new ArrayList<>(); - for (Map.Entry<Integer, PartitionData> partitionEntry : entries.getValue().entrySet()) { - PartitionData fetchPartitionData = partitionEntry.getValue(); - Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); - partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); - partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset); - partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata); - partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.error.code()); - partitionArray.add(partitionData); - } - topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); - topicArray.add(topicData); - } - - this.struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); + public OffsetFetchResponse(Errors error, Map<TopicPartition, PartitionData> responseData) { this.responseData = responseData; this.error = error; - if (version > 1) - this.struct.set(ERROR_CODE_KEY_NAME, this.error.code()); } public OffsetFetchResponse(Struct struct) { - super(struct); Errors topLevelError = Errors.NONE; this.responseData = new HashMap<>(); for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { @@ -175,12 +137,37 @@ public class OffsetFetchResponse extends AbstractResponse { return responseData; } - public static OffsetFetchResponse parse(ByteBuffer buffer, int version) { - Schema schema = ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, version); - return new OffsetFetchResponse(schema.read(buffer)); + public static OffsetFetchResponse parse(ByteBuffer buffer, short version) { + return new OffsetFetchResponse(ProtoUtils.parseResponse(ApiKeys.OFFSET_FETCH.id, version, buffer)); } - public static OffsetFetchResponse parse(ByteBuffer buffer) { - return new OffsetFetchResponse(CURRENT_SCHEMA.read(buffer)); + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, version)); + + Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData); + List<Struct> topicArray = new ArrayList<>(); + for (Map.Entry<String, Map<Integer, PartitionData>> entries : topicsData.entrySet()) { + Struct topicData = struct.instance(RESPONSES_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, entries.getKey()); + List<Struct> partitionArray = new ArrayList<>(); + for (Map.Entry<Integer, PartitionData> partitionEntry : entries.getValue().entrySet()) { + PartitionData fetchPartitionData = partitionEntry.getValue(); + Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); + partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); + partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset); + partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata); + partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.error.code()); + partitionArray.add(partitionData); + } + topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); + } + struct.set(RESPONSES_KEY_NAME, topicArray.toArray()); + + if (version > 1) + struct.set(ERROR_CODE_KEY_NAME, this.error.code()); + + return struct; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index bd3ae8f..df70e20 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -55,11 +55,10 @@ public class ProduceRequest extends AbstractRequest { } @Override - public ProduceRequest build() { - short version = version(); - if (version < 2) { + public ProduceRequest build(short version) { + if (version < 2) throw new UnsupportedVersionException("ProduceRequest versions older than 2 are not supported."); - } + return new ProduceRequest(version, acks, timeout, partitionRecords); } @@ -80,33 +79,14 @@ public class ProduceRequest extends AbstractRequest { private final Map<TopicPartition, MemoryRecords> partitionRecords; private ProduceRequest(short version, short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.PRODUCE.id, version)), version); - Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords); - struct.set(ACKS_KEY_NAME, acks); - struct.set(TIMEOUT_KEY_NAME, timeout); - List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size()); - for (Map.Entry<String, Map<Integer, MemoryRecords>> entry : recordsByTopic.entrySet()) { - Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME); - topicData.set(TOPIC_KEY_NAME, entry.getKey()); - List<Struct> partitionArray = new ArrayList<>(); - for (Map.Entry<Integer, MemoryRecords> partitionEntry : entry.getValue().entrySet()) { - MemoryRecords records = partitionEntry.getValue(); - Struct part = topicData.instance(PARTITION_DATA_KEY_NAME) - .set(PARTITION_KEY_NAME, partitionEntry.getKey()) - .set(RECORD_SET_KEY_NAME, records); - partitionArray.add(part); - } - topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray()); - topicDatas.add(topicData); - } - struct.set(TOPIC_DATA_KEY_NAME, topicDatas.toArray()); + super(version); this.acks = acks; this.timeout = timeout; this.partitionRecords = partitionRecords; } public ProduceRequest(Struct struct, short version) { - super(struct, version); + super(version); partitionRecords = new HashMap<>(); for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) { Struct topicData = (Struct) topicDataObj; @@ -122,6 +102,34 @@ public class ProduceRequest extends AbstractRequest { timeout = struct.getInt(TIMEOUT_KEY_NAME); } + /** + * Visible for testing. + */ + @Override + public Struct toStruct() { + Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.PRODUCE.id, version())); + Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords); + struct.set(ACKS_KEY_NAME, acks); + struct.set(TIMEOUT_KEY_NAME, timeout); + List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size()); + for (Map.Entry<String, Map<Integer, MemoryRecords>> entry : recordsByTopic.entrySet()) { + Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, entry.getKey()); + List<Struct> partitionArray = new ArrayList<>(); + for (Map.Entry<Integer, MemoryRecords> partitionEntry : entry.getValue().entrySet()) { + MemoryRecords records = partitionEntry.getValue(); + Struct part = topicData.instance(PARTITION_DATA_KEY_NAME) + .set(PARTITION_KEY_NAME, partitionEntry.getKey()) + .set(RECORD_SET_KEY_NAME, records); + partitionArray.add(part); + } + topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray()); + topicDatas.add(topicData); + } + struct.set(TOPIC_DATA_KEY_NAME, topicDatas.toArray()); + return struct; + } + @Override public AbstractResponse getErrorResponse(Throwable e) { /* In case the producer doesn't actually want any response */ @@ -137,10 +145,9 @@ public class ProduceRequest extends AbstractRequest { short versionId = version(); switch (versionId) { case 0: - return new ProduceResponse(responseMap); case 1: case 2: - return new ProduceResponse(responseMap, ProduceResponse.DEFAULT_THROTTLE_TIME, versionId); + return new ProduceResponse(responseMap); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.PRODUCE.id))); @@ -160,15 +167,10 @@ public class ProduceRequest extends AbstractRequest { } public void clearPartitionRecords() { - struct.clear(); partitionRecords.clear(); } - public static ProduceRequest parse(ByteBuffer buffer, int versionId) { - return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, versionId, buffer), (short) versionId); - } - - public static ProduceRequest parse(ByteBuffer buffer) { - return parse(buffer, ProtoUtils.latestVersion(ApiKeys.PRODUCE.id)); + public static ProduceRequest parse(ByteBuffer buffer, short versionId) { + return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, versionId, buffer), versionId); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 9eaaadf..7a022af 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -16,7 +16,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.utils.CollectionUtils; @@ -31,8 +30,7 @@ import java.util.Map; * This wrapper supports both v0 and v1 of ProduceResponse. */ public class ProduceResponse extends AbstractResponse { - - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id); + private static final String RESPONSES_KEY_NAME = "responses"; // topic level field names @@ -73,10 +71,7 @@ public class ProduceResponse extends AbstractResponse { * @param responses Produced data grouped by topic-partition */ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) { - super(new Struct(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 0))); - initCommonFields(responses); - this.responses = responses; - this.throttleTime = DEFAULT_THROTTLE_TIME; + this(responses, DEFAULT_THROTTLE_TIME); } /** @@ -85,30 +80,14 @@ public class ProduceResponse extends AbstractResponse { * @param throttleTime Time in milliseconds the response was throttled */ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTime) { - this(responses, throttleTime, ProtoUtils.latestVersion(ApiKeys.PRODUCE.id)); - } - - /** - * Constructor for a specific version - * @param responses Produced data grouped by topic-partition - * @param throttleTime Time in milliseconds the response was throttled - * @param version the version of schema to use. - */ - public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTime, int version) { - super(new Struct(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, version))); - initCommonFields(responses); - if (struct.hasField(THROTTLE_TIME_KEY_NAME)) - struct.set(THROTTLE_TIME_KEY_NAME, throttleTime); this.responses = responses; this.throttleTime = throttleTime; } /** - * Constructor from a {@link Struct}. It is the caller's responsibility to pass in a struct with the latest schema. - * @param struct + * Constructor from a {@link Struct}. */ public ProduceResponse(Struct struct) { - super(struct); responses = new HashMap<>(); for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) { Struct topicRespStruct = (Struct) topicResponse; @@ -126,7 +105,10 @@ public class ProduceResponse extends AbstractResponse { this.throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME); } - private void initCommonFields(Map<TopicPartition, PartitionResponse> responses) { + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, version)); + Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupDataByTopic(responses); List<Struct> topicDatas = new ArrayList<>(responseByTopic.size()); for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) { @@ -140,13 +122,17 @@ public class ProduceResponse extends AbstractResponse { .set(ERROR_CODE_KEY_NAME, part.error.code()) .set(BASE_OFFSET_KEY_NAME, part.baseOffset); if (partStruct.hasField(LOG_APPEND_TIME_KEY_NAME)) - partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); + partStruct.set(LOG_APPEND_TIME_KEY_NAME, part.logAppendTime); partitionArray.add(partStruct); } topicData.set(PARTITION_RESPONSES_KEY_NAME, partitionArray.toArray()); topicDatas.add(topicData); } struct.set(RESPONSES_KEY_NAME, topicDatas.toArray()); + + if (struct.hasField(THROTTLE_TIME_KEY_NAME)) + struct.set(THROTTLE_TIME_KEY_NAME, throttleTime); + return struct; } public Map<TopicPartition, PartitionResponse> responses() { @@ -187,7 +173,7 @@ public class ProduceResponse extends AbstractResponse { } } - public static ProduceResponse parse(ByteBuffer buffer) { - return new ProduceResponse(CURRENT_SCHEMA.read(buffer)); + public static ProduceResponse parse(ByteBuffer buffer, short version) { + return new ProduceResponse(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, version).read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/RequestAndSize.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestAndSize.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestAndSize.java new file mode 100644 index 0000000..d2147b3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestAndSize.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +public class RequestAndSize { + public final AbstractRequest request; + public final int size; + + public RequestAndSize(AbstractRequest request, int size) { + this.request = request; + this.size = size; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java index 05b78cb..5e65132 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java @@ -35,8 +35,7 @@ public class RequestHeader extends AbstractRequestResponse { private final String clientId; private final int correlationId; - public RequestHeader(Struct header) { - super(header); + public RequestHeader(Struct struct) { apiKey = struct.getShort(API_KEY_FIELD); apiVersion = struct.getShort(API_VERSION_FIELD); clientId = struct.getString(CLIENT_ID_FIELD); @@ -44,17 +43,21 @@ public class RequestHeader extends AbstractRequestResponse { } public RequestHeader(short apiKey, short version, String client, int correlation) { - super(new Struct(Protocol.REQUEST_HEADER)); - struct.set(API_KEY_FIELD, apiKey); - struct.set(API_VERSION_FIELD, version); - struct.set(CLIENT_ID_FIELD, client); - struct.set(CORRELATION_ID_FIELD, correlation); this.apiKey = apiKey; this.apiVersion = version; this.clientId = client; this.correlationId = correlation; } + public Struct toStruct() { + Struct struct = new Struct(Protocol.REQUEST_HEADER); + struct.set(API_KEY_FIELD, apiKey); + struct.set(API_VERSION_FIELD, apiVersion); + struct.set(CLIENT_ID_FIELD, clientId); + struct.set(CORRELATION_ID_FIELD, correlationId); + return struct; + } + public short apiKey() { return apiKey; } @@ -71,6 +74,10 @@ public class RequestHeader extends AbstractRequestResponse { return correlationId; } + public ResponseHeader toResponseHeader() { + return new ResponseHeader(correlationId); + } + public static RequestHeader parse(ByteBuffer buffer) { return new RequestHeader(Protocol.REQUEST_HEADER.read(buffer)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java index e68bd39..04390ea 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java @@ -34,17 +34,24 @@ public class ResponseHeader extends AbstractRequestResponse { private final int correlationId; - public ResponseHeader(Struct header) { - super(header); + public ResponseHeader(Struct struct) { correlationId = struct.getInt(CORRELATION_KEY_FIELD); } public ResponseHeader(int correlationId) { - super(new Struct(Protocol.RESPONSE_HEADER)); - struct.set(CORRELATION_KEY_FIELD, correlationId); this.correlationId = correlationId; } + public int sizeOf() { + return toStruct().sizeOf(); + } + + public Struct toStruct() { + Struct struct = new Struct(Protocol.RESPONSE_HEADER); + struct.set(CORRELATION_KEY_FIELD, correlationId); + return struct; + } + public int correlationId() { return correlationId; } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java index d244f0a..a1f3f0e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java @@ -25,7 +25,6 @@ import java.util.List; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -40,19 +39,17 @@ import org.apache.kafka.common.protocol.types.Struct; */ public class SaslHandshakeRequest extends AbstractRequest { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.SASL_HANDSHAKE.id); public static final String MECHANISM_KEY_NAME = "mechanism"; private final String mechanism; public SaslHandshakeRequest(String mechanism) { - super(new Struct(CURRENT_SCHEMA), ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id)); - struct.set(MECHANISM_KEY_NAME, mechanism); + super(ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id)); this.mechanism = mechanism; } - public SaslHandshakeRequest(Struct struct, short versionId) { - super(struct, versionId); + public SaslHandshakeRequest(Struct struct, short version) { + super(version); mechanism = struct.getString(MECHANISM_KEY_NAME); } @@ -73,13 +70,15 @@ public class SaslHandshakeRequest extends AbstractRequest { } } - public static SaslHandshakeRequest parse(ByteBuffer buffer, int versionId) { - return new SaslHandshakeRequest(ProtoUtils.parseRequest(ApiKeys.SASL_HANDSHAKE.id, versionId, buffer), - (short) versionId); + public static SaslHandshakeRequest parse(ByteBuffer buffer, short versionId) { + return new SaslHandshakeRequest(ProtoUtils.parseRequest(ApiKeys.SASL_HANDSHAKE.id, versionId, buffer), versionId); } - public static SaslHandshakeRequest parse(ByteBuffer buffer) { - return parse(buffer, ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id)); + @Override + protected Struct toStruct() { + Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.SASL_HANDSHAKE.id, version())); + struct.set(MECHANISM_KEY_NAME, mechanism); + return struct; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java index f50c5be..9d38c6a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java @@ -26,7 +26,6 @@ import java.util.List; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; @@ -36,8 +35,6 @@ import org.apache.kafka.common.protocol.types.Struct; */ public class SaslHandshakeResponse extends AbstractResponse { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.SASL_HANDSHAKE.id); - private static final String ERROR_CODE_KEY_NAME = "error_code"; private static final String ENABLED_MECHANISMS_KEY_NAME = "enabled_mechanisms"; @@ -50,15 +47,11 @@ public class SaslHandshakeResponse extends AbstractResponse { private final List<String> enabledMechanisms; public SaslHandshakeResponse(Errors error, Collection<String> enabledMechanisms) { - super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, error.code()); - struct.set(ENABLED_MECHANISMS_KEY_NAME, enabledMechanisms.toArray()); this.error = error; this.enabledMechanisms = new ArrayList<>(enabledMechanisms); } public SaslHandshakeResponse(Struct struct) { - super(struct); error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); Object[] mechanisms = struct.getArray(ENABLED_MECHANISMS_KEY_NAME); ArrayList<String> enabledMechanisms = new ArrayList<>(); @@ -71,15 +64,19 @@ public class SaslHandshakeResponse extends AbstractResponse { return error; } - public List<String> enabledMechanisms() { - return enabledMechanisms; + @Override + public Struct toStruct(short version) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.SASL_HANDSHAKE.id, version)); + struct.set(ERROR_CODE_KEY_NAME, error.code()); + struct.set(ENABLED_MECHANISMS_KEY_NAME, enabledMechanisms.toArray()); + return struct; } - public static SaslHandshakeResponse parse(ByteBuffer buffer) { - return new SaslHandshakeResponse(CURRENT_SCHEMA.read(buffer)); + public List<String> enabledMechanisms() { + return enabledMechanisms; } - public static SaslHandshakeResponse parse(ByteBuffer buffer, int version) { + public static SaslHandshakeResponse parse(ByteBuffer buffer, short version) { return new SaslHandshakeResponse(ProtoUtils.parseResponse(ApiKeys.SASL_HANDSHAKE.id, version, buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java index ff2638b..91806f1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java @@ -52,9 +52,9 @@ public class StopReplicaRequest extends AbstractRequest { } @Override - public StopReplicaRequest build() { + public StopReplicaRequest build(short version) { return new StopReplicaRequest(controllerId, controllerEpoch, - deletePartitions, partitions, version()); + deletePartitions, partitions, version); } @Override @@ -77,30 +77,15 @@ public class StopReplicaRequest extends AbstractRequest { private StopReplicaRequest(int controllerId, int controllerEpoch, boolean deletePartitions, Set<TopicPartition> partitions, short version) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.STOP_REPLICA.id, version)), version); - - struct.set(CONTROLLER_ID_KEY_NAME, controllerId); - struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch); - struct.set(DELETE_PARTITIONS_KEY_NAME, deletePartitions); - - List<Struct> partitionDatas = new ArrayList<>(partitions.size()); - for (TopicPartition partition : partitions) { - Struct partitionData = struct.instance(PARTITIONS_KEY_NAME); - partitionData.set(TOPIC_KEY_NAME, partition.topic()); - partitionData.set(PARTITION_KEY_NAME, partition.partition()); - partitionDatas.add(partitionData); - } - - struct.set(PARTITIONS_KEY_NAME, partitionDatas.toArray()); - + super(version); this.controllerId = controllerId; this.controllerEpoch = controllerEpoch; this.deletePartitions = deletePartitions; this.partitions = partitions; } - public StopReplicaRequest(Struct struct, short versionId) { - super(struct, versionId); + public StopReplicaRequest(Struct struct, short version) { + super(version); partitions = new HashSet<>(); for (Object partitionDataObj : struct.getArray(PARTITIONS_KEY_NAME)) { @@ -148,12 +133,27 @@ public class StopReplicaRequest extends AbstractRequest { return partitions; } - public static StopReplicaRequest parse(ByteBuffer buffer, int versionId) { - return new StopReplicaRequest(ProtoUtils.parseRequest(ApiKeys.STOP_REPLICA.id, versionId, buffer), - (short) versionId); + public static StopReplicaRequest parse(ByteBuffer buffer, short versionId) { + return new StopReplicaRequest(ProtoUtils.parseRequest(ApiKeys.STOP_REPLICA.id, versionId, buffer), versionId); } - public static StopReplicaRequest parse(ByteBuffer buffer) { - return parse(buffer, ProtoUtils.latestVersion(ApiKeys.STOP_REPLICA.id)); + @Override + protected Struct toStruct() { + Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.STOP_REPLICA.id, version())); + + struct.set(CONTROLLER_ID_KEY_NAME, controllerId); + struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch); + struct.set(DELETE_PARTITIONS_KEY_NAME, deletePartitions); + + List<Struct> partitionDatas = new ArrayList<>(partitions.size()); + for (TopicPartition partition : partitions) { + Struct partitionData = struct.instance(PARTITIONS_KEY_NAME); + partitionData.set(TOPIC_KEY_NAME, partition.topic()); + partitionData.set(PARTITION_KEY_NAME, partition.partition()); + partitionDatas.add(partitionData); + } + + struct.set(PARTITIONS_KEY_NAME, partitionDatas.toArray()); + return struct; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java index b39fb19..5ae5cc1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java @@ -17,7 +17,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -27,7 +26,6 @@ import java.util.List; import java.util.Map; public class StopReplicaResponse extends AbstractResponse { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.STOP_REPLICA.id); private static final String ERROR_CODE_KEY_NAME = "error_code"; private static final String PARTITIONS_KEY_NAME = "partitions"; @@ -37,41 +35,20 @@ public class StopReplicaResponse extends AbstractResponse { private static final String PARTITIONS_ERROR_CODE_KEY_NAME = "error_code"; private final Map<TopicPartition, Errors> responses; - private final Errors error; /** * Possible error code: * * STALE_CONTROLLER_EPOCH (11) */ - - public StopReplicaResponse(Map<TopicPartition, Errors> responses) { - this(Errors.NONE, responses); - } + private final Errors error; public StopReplicaResponse(Errors error, Map<TopicPartition, Errors> responses) { - super(new Struct(CURRENT_SCHEMA)); - - List<Struct> responseDatas = new ArrayList<>(responses.size()); - for (Map.Entry<TopicPartition, Errors> response : responses.entrySet()) { - Struct partitionData = struct.instance(PARTITIONS_KEY_NAME); - TopicPartition partition = response.getKey(); - partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic()); - partitionData.set(PARTITIONS_PARTITION_KEY_NAME, partition.partition()); - partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue().code()); - responseDatas.add(partitionData); - } - - struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray()); - struct.set(ERROR_CODE_KEY_NAME, error.code()); - this.responses = responses; this.error = error; } public StopReplicaResponse(Struct struct) { - super(struct); - responses = new HashMap<>(); for (Object responseDataObj : struct.getArray(PARTITIONS_KEY_NAME)) { Struct responseData = (Struct) responseDataObj; @@ -92,11 +69,27 @@ public class StopReplicaResponse extends AbstractResponse { return error; } - public static StopReplicaResponse parse(ByteBuffer buffer, int versionId) { - return new StopReplicaResponse(ProtoUtils.parseRequest(ApiKeys.STOP_REPLICA.id, versionId, buffer)); + public static StopReplicaResponse parse(ByteBuffer buffer, short versionId) { + return new StopReplicaResponse(ProtoUtils.parseResponse(ApiKeys.STOP_REPLICA.id, versionId, buffer)); } - public static StopReplicaResponse parse(ByteBuffer buffer) { - return new StopReplicaResponse(CURRENT_SCHEMA.read(buffer)); + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.STOP_REPLICA.id, version)); + + List<Struct> responseDatas = new ArrayList<>(responses.size()); + for (Map.Entry<TopicPartition, Errors> response : responses.entrySet()) { + Struct partitionData = struct.instance(PARTITIONS_KEY_NAME); + TopicPartition partition = response.getKey(); + partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic()); + partitionData.set(PARTITIONS_PARTITION_KEY_NAME, partition.partition()); + partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue().code()); + responseDatas.add(partitionData); + } + + struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray()); + struct.set(ERROR_CODE_KEY_NAME, error.code()); + + return struct; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java index 937bf98..7ad5c9a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java @@ -51,8 +51,8 @@ public class SyncGroupRequest extends AbstractRequest { } @Override - public SyncGroupRequest build() { - return new SyncGroupRequest(groupId, generationId, memberId, groupAssignment, version()); + public SyncGroupRequest build(short version) { + return new SyncGroupRequest(groupId, generationId, memberId, groupAssignment, version); } @Override @@ -75,20 +75,7 @@ public class SyncGroupRequest extends AbstractRequest { private SyncGroupRequest(String groupId, int generationId, String memberId, Map<String, ByteBuffer> groupAssignment, short version) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.SYNC_GROUP.id, version)), version); - struct.set(GROUP_ID_KEY_NAME, groupId); - struct.set(GENERATION_ID_KEY_NAME, generationId); - struct.set(MEMBER_ID_KEY_NAME, memberId); - - List<Struct> memberArray = new ArrayList<>(); - for (Map.Entry<String, ByteBuffer> entries: groupAssignment.entrySet()) { - Struct memberData = struct.instance(GROUP_ASSIGNMENT_KEY_NAME); - memberData.set(MEMBER_ID_KEY_NAME, entries.getKey()); - memberData.set(MEMBER_ASSIGNMENT_KEY_NAME, entries.getValue()); - memberArray.add(memberData); - } - struct.set(GROUP_ASSIGNMENT_KEY_NAME, memberArray.toArray()); - + super(version); this.groupId = groupId; this.generationId = generationId; this.memberId = memberId; @@ -96,7 +83,7 @@ public class SyncGroupRequest extends AbstractRequest { } public SyncGroupRequest(Struct struct, short version) { - super(struct, version); + super(version); this.groupId = struct.getString(GROUP_ID_KEY_NAME); this.generationId = struct.getInt(GENERATION_ID_KEY_NAME); this.memberId = struct.getString(MEMBER_ID_KEY_NAME); @@ -141,12 +128,25 @@ public class SyncGroupRequest extends AbstractRequest { return memberId; } - public static SyncGroupRequest parse(ByteBuffer buffer, int versionId) { - return new SyncGroupRequest(ProtoUtils.parseRequest(ApiKeys.SYNC_GROUP.id, versionId, buffer), - (short) versionId); + public static SyncGroupRequest parse(ByteBuffer buffer, short versionId) { + return new SyncGroupRequest(ProtoUtils.parseRequest(ApiKeys.SYNC_GROUP.id, versionId, buffer), versionId); } - public static SyncGroupRequest parse(ByteBuffer buffer) { - return parse(buffer, ProtoUtils.latestVersion(ApiKeys.SYNC_GROUP.id)); + @Override + protected Struct toStruct() { + Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.SYNC_GROUP.id, version())); + struct.set(GROUP_ID_KEY_NAME, groupId); + struct.set(GENERATION_ID_KEY_NAME, generationId); + struct.set(MEMBER_ID_KEY_NAME, memberId); + + List<Struct> memberArray = new ArrayList<>(); + for (Map.Entry<String, ByteBuffer> entries: groupAssignment.entrySet()) { + Struct memberData = struct.instance(GROUP_ASSIGNMENT_KEY_NAME); + memberData.set(MEMBER_ID_KEY_NAME, entries.getKey()); + memberData.set(MEMBER_ASSIGNMENT_KEY_NAME, entries.getValue()); + memberArray.add(memberData); + } + struct.set(GROUP_ASSIGNMENT_KEY_NAME, memberArray.toArray()); + return struct; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java index e598975..ff198aa 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java @@ -19,14 +19,12 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; public class SyncGroupResponse extends AbstractResponse { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.SYNC_GROUP.id); public static final String ERROR_CODE_KEY_NAME = "error_code"; public static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment"; @@ -45,18 +43,11 @@ public class SyncGroupResponse extends AbstractResponse { private final ByteBuffer memberState; public SyncGroupResponse(Errors error, ByteBuffer memberState) { - super(new Struct(CURRENT_SCHEMA)); - - struct.set(ERROR_CODE_KEY_NAME, error.code()); - struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState); - this.error = error; this.memberState = memberState; } public SyncGroupResponse(Struct struct) { - super(struct); - this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); this.memberState = struct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME); } @@ -69,8 +60,16 @@ public class SyncGroupResponse extends AbstractResponse { return memberState; } - public static SyncGroupResponse parse(ByteBuffer buffer) { - return new SyncGroupResponse(CURRENT_SCHEMA.read(buffer)); + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.SYNC_GROUP.id, version)); + struct.set(ERROR_CODE_KEY_NAME, error.code()); + struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState); + return struct; + } + + public static SyncGroupResponse parse(ByteBuffer buffer, short version) { + return new SyncGroupResponse(ProtoUtils.parseResponse(ApiKeys.SYNC_GROUP.id, version, buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index ef680ff..8dd852d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -38,10 +38,9 @@ public class UpdateMetadataRequest extends AbstractRequest { private final Map<TopicPartition, PartitionState> partitionStates; private final Set<Broker> liveBrokers; - public Builder(int controllerId, int controllerEpoch, - Map<TopicPartition, PartitionState> partitionStates, - Set<Broker> liveBrokers) { - super(ApiKeys.UPDATE_METADATA_KEY); + public Builder(short version, int controllerId, int controllerEpoch, + Map<TopicPartition, PartitionState> partitionStates, Set<Broker> liveBrokers) { + super(ApiKeys.UPDATE_METADATA_KEY, version); this.controllerId = controllerId; this.controllerEpoch = controllerEpoch; this.partitionStates = partitionStates; @@ -49,8 +48,7 @@ public class UpdateMetadataRequest extends AbstractRequest { } @Override - public UpdateMetadataRequest build() { - short version = version(); + public UpdateMetadataRequest build(short version) { if (version == 0) { for (Broker broker : liveBrokers) { if (broker.endPoints.size() != 1 || broker.endPoints.get(0).securityProtocol != SecurityProtocol.PLAINTEXT) { @@ -148,58 +146,7 @@ public class UpdateMetadataRequest extends AbstractRequest { private UpdateMetadataRequest(short version, int controllerId, int controllerEpoch, Map<TopicPartition, PartitionState> partitionStates, Set<Broker> liveBrokers) { - super(new Struct(ProtoUtils.requestSchema(ApiKeys.UPDATE_METADATA_KEY.id, version)), version); - struct.set(CONTROLLER_ID_KEY_NAME, controllerId); - struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch); - - List<Struct> partitionStatesData = new ArrayList<>(partitionStates.size()); - for (Map.Entry<TopicPartition, PartitionState> entry : partitionStates.entrySet()) { - Struct partitionStateData = struct.instance(PARTITION_STATES_KEY_NAME); - TopicPartition topicPartition = entry.getKey(); - partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic()); - partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition()); - PartitionState partitionState = entry.getValue(); - partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.controllerEpoch); - partitionStateData.set(LEADER_KEY_NAME, partitionState.leader); - partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.leaderEpoch); - partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray()); - partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion); - partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray()); - partitionStatesData.add(partitionStateData); - } - struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray()); - - List<Struct> brokersData = new ArrayList<>(liveBrokers.size()); - for (Broker broker : liveBrokers) { - Struct brokerData = struct.instance(LIVE_BROKERS_KEY_NAME); - brokerData.set(BROKER_ID_KEY_NAME, broker.id); - - if (version == 0) { - EndPoint endPoint = broker.endPoints.get(0); - brokerData.set(HOST_KEY_NAME, endPoint.host); - brokerData.set(PORT_KEY_NAME, endPoint.port); - } else { - List<Struct> endPointsData = new ArrayList<>(broker.endPoints.size()); - for (EndPoint endPoint : broker.endPoints) { - Struct endPointData = brokerData.instance(ENDPOINTS_KEY_NAME); - endPointData.set(PORT_KEY_NAME, endPoint.port); - endPointData.set(HOST_KEY_NAME, endPoint.host); - endPointData.set(SECURITY_PROTOCOL_TYPE_KEY_NAME, endPoint.securityProtocol.id); - if (version >= 3) - endPointData.set(LISTENER_NAME_KEY_NAME, endPoint.listenerName.value()); - endPointsData.add(endPointData); - - } - brokerData.set(ENDPOINTS_KEY_NAME, endPointsData.toArray()); - if (version >= 2) { - brokerData.set(RACK_KEY_NAME, broker.rack); - } - } - - brokersData.add(brokerData); - } - struct.set(LIVE_BROKERS_KEY_NAME, brokersData.toArray()); - + super(version); this.controllerId = controllerId; this.controllerEpoch = controllerEpoch; this.partitionStates = partitionStates; @@ -207,7 +154,7 @@ public class UpdateMetadataRequest extends AbstractRequest { } public UpdateMetadataRequest(Struct struct, short versionId) { - super(struct, versionId); + super(versionId); Map<TopicPartition, PartitionState> partitionStates = new HashMap<>(); for (Object partitionStateDataObj : struct.getArray(PARTITION_STATES_KEY_NAME)) { Struct partitionStateData = (Struct) partitionStateDataObj; @@ -277,6 +224,64 @@ public class UpdateMetadataRequest extends AbstractRequest { } @Override + protected Struct toStruct() { + short version = version(); + Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.UPDATE_METADATA_KEY.id, version)); + struct.set(CONTROLLER_ID_KEY_NAME, controllerId); + struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch); + + List<Struct> partitionStatesData = new ArrayList<>(partitionStates.size()); + for (Map.Entry<TopicPartition, PartitionState> entry : partitionStates.entrySet()) { + Struct partitionStateData = struct.instance(PARTITION_STATES_KEY_NAME); + TopicPartition topicPartition = entry.getKey(); + partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic()); + partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition()); + PartitionState partitionState = entry.getValue(); + partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.controllerEpoch); + partitionStateData.set(LEADER_KEY_NAME, partitionState.leader); + partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.leaderEpoch); + partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray()); + partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion); + partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray()); + partitionStatesData.add(partitionStateData); + } + struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray()); + + List<Struct> brokersData = new ArrayList<>(liveBrokers.size()); + for (Broker broker : liveBrokers) { + Struct brokerData = struct.instance(LIVE_BROKERS_KEY_NAME); + brokerData.set(BROKER_ID_KEY_NAME, broker.id); + + if (version == 0) { + EndPoint endPoint = broker.endPoints.get(0); + brokerData.set(HOST_KEY_NAME, endPoint.host); + brokerData.set(PORT_KEY_NAME, endPoint.port); + } else { + List<Struct> endPointsData = new ArrayList<>(broker.endPoints.size()); + for (EndPoint endPoint : broker.endPoints) { + Struct endPointData = brokerData.instance(ENDPOINTS_KEY_NAME); + endPointData.set(PORT_KEY_NAME, endPoint.port); + endPointData.set(HOST_KEY_NAME, endPoint.host); + endPointData.set(SECURITY_PROTOCOL_TYPE_KEY_NAME, endPoint.securityProtocol.id); + if (version >= 3) + endPointData.set(LISTENER_NAME_KEY_NAME, endPoint.listenerName.value()); + endPointsData.add(endPointData); + + } + brokerData.set(ENDPOINTS_KEY_NAME, endPointsData.toArray()); + if (version >= 2) { + brokerData.set(RACK_KEY_NAME, broker.rack); + } + } + + brokersData.add(brokerData); + } + struct.set(LIVE_BROKERS_KEY_NAME, brokersData.toArray()); + + return struct; + } + + @Override public AbstractResponse getErrorResponse(Throwable e) { short versionId = version(); if (versionId <= 3) @@ -302,12 +307,9 @@ public class UpdateMetadataRequest extends AbstractRequest { return liveBrokers; } - public static UpdateMetadataRequest parse(ByteBuffer buffer, int versionId) { + public static UpdateMetadataRequest parse(ByteBuffer buffer, short versionId) { return new UpdateMetadataRequest(ProtoUtils.parseRequest(ApiKeys.UPDATE_METADATA_KEY.id, versionId, buffer), - (short) versionId); + versionId); } - public static UpdateMetadataRequest parse(ByteBuffer buffer) { - return parse(buffer, ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)); - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java index 865d6c6..0032fca 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java @@ -16,15 +16,12 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; public class UpdateMetadataResponse extends AbstractResponse { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.UPDATE_METADATA_KEY.id); - private static final String ERROR_CODE_KEY_NAME = "error_code"; /** @@ -35,13 +32,10 @@ public class UpdateMetadataResponse extends AbstractResponse { private final Errors error; public UpdateMetadataResponse(Errors error) { - super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, error.code()); this.error = error; } public UpdateMetadataResponse(Struct struct) { - super(struct); error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); } @@ -49,12 +43,14 @@ public class UpdateMetadataResponse extends AbstractResponse { return error; } - public static UpdateMetadataResponse parse(ByteBuffer buffer) { - return new UpdateMetadataResponse(CURRENT_SCHEMA.read(buffer)); - } - - public static UpdateMetadataResponse parse(ByteBuffer buffer, int version) { + public static UpdateMetadataResponse parse(ByteBuffer buffer, short version) { return new UpdateMetadataResponse(ProtoUtils.parseResponse(ApiKeys.UPDATE_METADATA_KEY.id, version, buffer)); } + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.UPDATE_METADATA_KEY.id, version)); + struct.set(ERROR_CODE_KEY_NAME, error.code()); + return struct; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 7f6b7aa..88f8959 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -308,11 +308,12 @@ public class SaslServerAuthenticator implements Authenticator { if (!Protocol.apiVersionSupported(requestHeader.apiKey(), requestHeader.apiVersion())) { if (apiKey == ApiKeys.API_VERSIONS) - sendKafkaResponse(requestHeader, ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION)); + sendKafkaResponse(ApiVersionsResponse.unsupportedVersionSend(node, requestHeader)); else throw new UnsupportedVersionException("Version " + requestHeader.apiVersion() + " is not supported for apiKey " + apiKey); } else { - AbstractRequest request = AbstractRequest.getRequest(requestHeader.apiKey(), requestHeader.apiVersion(), requestBuffer); + AbstractRequest request = AbstractRequest.getRequest(requestHeader.apiKey(), requestHeader.apiVersion(), + requestBuffer).request; LOG.debug("Handle Kafka request {}", apiKey); switch (apiKey) { @@ -373,7 +374,11 @@ public class SaslServerAuthenticator implements Authenticator { } private void sendKafkaResponse(RequestHeader requestHeader, AbstractResponse response) throws IOException { - netOutBuffer = response.toSend(node, requestHeader); + sendKafkaResponse(response.toSend(node, requestHeader)); + } + + private void sendKafkaResponse(Send send) throws IOException { + netOutBuffer = send; flushNetOutBufferAndUpdateInterestOps(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java index ba38637..106a7d4 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java @@ -26,14 +26,14 @@ public class CollectionUtils { * @param <T> Partition data type * @return partitioned data */ - public static <T> Map<String, Map<Integer, T>> groupDataByTopic(Map<TopicPartition, T> data) { - Map<String, Map<Integer, T>> dataByTopic = new HashMap<String, Map<Integer, T>>(); - for (Map.Entry<TopicPartition, T> entry: data.entrySet()) { + public static <T> Map<String, Map<Integer, T>> groupDataByTopic(Map<TopicPartition, ? extends T> data) { + Map<String, Map<Integer, T>> dataByTopic = new HashMap<>(); + for (Map.Entry<TopicPartition, ? extends T> entry: data.entrySet()) { String topic = entry.getKey().topic(); int partition = entry.getKey().partition(); Map<Integer, T> topicData = dataByTopic.get(topic); if (topicData == null) { - topicData = new HashMap<Integer, T>(); + topicData = new HashMap<>(); dataByTopic.put(topic, topicData); } topicData.put(partition, entry.getValue()); @@ -47,12 +47,12 @@ public class CollectionUtils { * @return partitions per topic */ public static Map<String, List<Integer>> groupDataByTopic(List<TopicPartition> partitions) { - Map<String, List<Integer>> partitionsByTopic = new HashMap<String, List<Integer>>(); + Map<String, List<Integer>> partitionsByTopic = new HashMap<>(); for (TopicPartition tp: partitions) { String topic = tp.topic(); List<Integer> topicData = partitionsByTopic.get(topic); if (topicData == null) { - topicData = new ArrayList<Integer>(); + topicData = new ArrayList<>(); partitionsByTopic.put(topic, topicData); } topicData.add(tp.partition()); http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/test/java/org/apache/kafka/clients/MockClient.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 50ed131..7712d3c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -131,7 +131,8 @@ public class MockClient implements KafkaClient { while (iter.hasNext()) { ClientRequest request = iter.next(); if (request.destination().equals(node)) { - responses.add(new ClientResponse(request.makeHeader(), request.callback(), request.destination(), + short version = request.requestBuilder().desiredOrLatestVersion(); + responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(), request.createdTimeMs(), now, true, null, null)); iter.remove(); } @@ -146,13 +147,11 @@ public class MockClient implements KafkaClient { FutureResponse futureResp = iterator.next(); if (futureResp.node != null && !request.destination().equals(futureResp.node.idString())) continue; - request.requestBuilder().setVersion(nodeApiVersions.usableVersion( - request.requestBuilder().apiKey())); - AbstractRequest abstractRequest = request.requestBuilder().build(); + short usableVersion = nodeApiVersions.usableVersion(request.requestBuilder().apiKey()); + AbstractRequest abstractRequest = request.requestBuilder().build(usableVersion); if (!futureResp.requestMatcher.matches(abstractRequest)) throw new IllegalStateException("Next in line response did not match expected request"); - - ClientResponse resp = new ClientResponse(request.makeHeader(), request.callback(), request.destination(), + ClientResponse resp = new ClientResponse(request.makeHeader(usableVersion), request.callback(), request.destination(), request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, null, futureResp.responseBody); responses.add(resp); iterator.remove(); @@ -192,7 +191,8 @@ public class MockClient implements KafkaClient { public void respond(AbstractResponse response, boolean disconnected) { ClientRequest request = requests.remove(); - responses.add(new ClientResponse(request.makeHeader(), request.callback(), request.destination(), + short version = request.requestBuilder().desiredOrLatestVersion(); + responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(), request.createdTimeMs(), time.milliseconds(), disconnected, null, response)); } @@ -206,7 +206,8 @@ public class MockClient implements KafkaClient { ClientRequest request = iterator.next(); if (request.destination().equals(node.idString())) { iterator.remove(); - responses.add(new ClientResponse(request.makeHeader(), request.callback(), request.destination(), + short version = request.requestBuilder().desiredOrLatestVersion(); + responses.add(new ClientResponse(request.makeHeader(version), request.callback(), request.destination(), request.createdTimeMs(), time.milliseconds(), disconnected, null, response)); return; } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index deaf2cc..c89cc24 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.MemoryRecords; -import org.apache.kafka.common.requests.AbstractRequestResponse; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.ProduceRequest; @@ -137,9 +136,10 @@ public class NetworkClientTest { ResponseHeader respHeader = new ResponseHeader(request.correlationId()); Struct resp = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id)); resp.set("responses", new Object[0]); - int size = respHeader.sizeOf() + resp.sizeOf(); + Struct responseHeaderStruct = respHeader.toStruct(); + int size = responseHeaderStruct.sizeOf() + resp.sizeOf(); ByteBuffer buffer = ByteBuffer.allocate(size); - respHeader.writeTo(buffer); + responseHeaderStruct.writeTo(buffer); resp.writeTo(buffer); buffer.flip(); selector.completeReceive(new NetworkReceive(node.idString(), buffer)); @@ -152,9 +152,7 @@ public class NetworkClientTest { } private void maybeSetExpectedApiVersionsResponse() { - ResponseHeader responseHeader = new ResponseHeader(0); - ByteBuffer buffer = AbstractRequestResponse.serialize(responseHeader, - ApiVersionsResponse.API_VERSIONS_RESPONSE); + ByteBuffer buffer = ApiVersionsResponse.API_VERSIONS_RESPONSE.serialize((short) 0, new ResponseHeader(0)); selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer))); } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index a355aa1..5c4590b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1436,7 +1436,7 @@ public class KafkaConsumerTest { partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(error, 1L, partitionOffset.getValue())); } - return new ListOffsetResponse(partitionData, 1); + return new ListOffsetResponse(partitionData); } private FetchResponse fetchResponse(Map<TopicPartition, FetchInfo> fetches) {