Repository: kafka Updated Branches: refs/heads/trunk 5e0bb3df9 -> 92c06cbad
MINOR: Protocol schema refactor follow-up - Use constants in a few places that were missed - Remove ProtoUtils by moving its methods to Schema - Merge SchemaVisitor and SchemaVisitorAdapter - Change SchemaVisitor package. Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #3895 from ijuma/protocol-schema-refactor-follow-ups Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/92c06cba Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/92c06cba Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/92c06cba Branch: refs/heads/trunk Commit: 92c06cbad591b88e69f416180052a72af455fa30 Parents: 5e0bb3d Author: Ismael Juma <ism...@juma.me.uk> Authored: Tue Sep 19 11:07:32 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Sep 19 11:07:32 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/common/protocol/ApiKeys.java | 16 +++---- .../kafka/common/protocol/ProtoUtils.java | 47 -------------------- .../kafka/common/protocol/SchemaVisitor.java | 27 ----------- .../common/protocol/SchemaVisitorAdapter.java | 38 ---------------- .../kafka/common/protocol/types/Schema.java | 31 +++++++++++++ .../requests/AlterReplicaDirResponse.java | 6 +-- .../common/requests/ApiVersionsResponse.java | 2 +- .../requests/ControlledShutdownResponse.java | 2 +- .../common/requests/DeleteRecordsResponse.java | 4 +- .../common/requests/DescribeLogDirsRequest.java | 4 +- .../requests/DescribeLogDirsResponse.java | 16 +++---- .../kafka/common/requests/FetchRequest.java | 2 +- .../kafka/common/requests/FetchResponse.java | 4 +- .../common/requests/LeaderAndIsrRequest.java | 3 +- .../common/requests/ListGroupsResponse.java | 8 ++-- .../common/requests/OffsetCommitResponse.java | 1 - .../requests/OffsetsForLeaderEpochRequest.java | 2 +- .../kafka/common/requests/RequestHeader.java | 1 - .../common/requests/UpdateMetadataRequest.java | 3 +- .../common/requests/WriteTxnMarkersRequest.java | 11 ++--- .../requests/WriteTxnMarkersResponse.java | 13 +++--- 21 files changed, 81 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 0e087eb..62dce79 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -97,7 +97,7 @@ import org.apache.kafka.common.requests.WriteTxnMarkersRequest; import org.apache.kafka.common.requests.WriteTxnMarkersResponse; import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.kafka.common.protocol.types.Type.BYTES; import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES; @@ -325,18 +325,16 @@ public enum ApiKeys { } private static boolean retainsBufferReference(Schema schema) { - final AtomicReference<Boolean> foundBufferReference = new AtomicReference<>(Boolean.FALSE); - SchemaVisitor detector = new SchemaVisitorAdapter() { + final AtomicBoolean hasBuffer = new AtomicBoolean(false); + Schema.Visitor detector = new Schema.Visitor() { @Override public void visit(Type field) { - if (field == BYTES || field == NULLABLE_BYTES || field == RECORDS) { - foundBufferReference.set(Boolean.TRUE); - } + if (field == BYTES || field == NULLABLE_BYTES || field == RECORDS) + hasBuffer.set(true); } }; - foundBufferReference.set(Boolean.FALSE); - ProtoUtils.walk(schema, detector); - return foundBufferReference.get(); + schema.walk(detector); + return hasBuffer.get(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java deleted file mode 100644 index f9be12c..0000000 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.protocol; - -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.BoundField; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Type; - -public class ProtoUtils { - public static void walk(Schema schema, SchemaVisitor visitor) { - if (schema == null || visitor == null) { - throw new IllegalArgumentException("Both schema and visitor must be provided"); - } - handleNode(schema, visitor); - } - - private static void handleNode(Type node, SchemaVisitor visitor) { - if (node instanceof Schema) { - Schema schema = (Schema) node; - visitor.visit(schema); - for (BoundField f : schema.fields()) { - handleNode(f.def.type, visitor); - } - } else if (node instanceof ArrayOf) { - ArrayOf array = (ArrayOf) node; - visitor.visit(array); - handleNode(array.type(), visitor); - } else { - visitor.visit(node); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java deleted file mode 100644 index e61cc77..0000000 --- a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.protocol; - -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Type; - -public interface SchemaVisitor { - void visit(Schema schema); - void visit(ArrayOf array); - void visit(Type field); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java deleted file mode 100644 index 62834d0..0000000 --- a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.protocol; - -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Type; - -public abstract class SchemaVisitorAdapter implements SchemaVisitor { - @Override - public void visit(Schema schema) { - //nop - } - - @Override - public void visit(ArrayOf array) { - //nop - } - - @Override - public void visit(Type field) { - //nop - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java index 187e14b..faa1540 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java @@ -19,6 +19,7 @@ package org.apache.kafka.common.protocol.types; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import java.util.Objects; /** * The schema for a compound record definition @@ -164,4 +165,34 @@ public class Schema extends Type { } } + public void walk(Visitor visitor) { + Objects.requireNonNull(visitor, "visitor must be non-null"); + handleNode(this, visitor); + } + + private static void handleNode(Type node, Visitor visitor) { + if (node instanceof Schema) { + Schema schema = (Schema) node; + visitor.visit(schema); + for (BoundField f : schema.fields()) + handleNode(f.def.type, visitor); + } else if (node instanceof ArrayOf) { + ArrayOf array = (ArrayOf) node; + visitor.visit(array); + handleNode(array.type(), visitor); + } else { + visitor.visit(node); + } + } + + /** + * Override one or more of the visit methods with the desired logic. + */ + public static abstract class Visitor { + public void visit(Schema schema) {} + public void visit(ArrayOf array) {} + public void visit(Type field) {} + } + + } http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java index ed00b75..1767d45 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaDirResponse.java @@ -48,9 +48,9 @@ public class AlterReplicaDirResponse extends AbstractResponse { private static final Schema ALTER_REPLICA_DIR_RESPONSE_V0 = new Schema( THROTTLE_TIME_MS, - new Field("topics", new ArrayOf(new Schema( + new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( TOPIC_NAME, - new Field("partitions", new ArrayOf(new Schema( + new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema( PARTITION_ID, ERROR_CODE))))))); @@ -127,4 +127,4 @@ public class AlterReplicaDirResponse extends AbstractResponse { public static AlterReplicaDirResponse parse(ByteBuffer buffer, short version) { return new AlterReplicaDirResponse(ApiKeys.ALTER_REPLICA_DIR.responseSchema(version).read(buffer)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 6a0418f..2bdc8aa 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -165,7 +165,7 @@ public class ApiVersionsResponse extends AbstractResponse { private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) { Map<Short, ApiVersion> tempApiIdToApiVersion = new HashMap<>(); - for (ApiVersion apiVersion: apiVersions) { + for (ApiVersion apiVersion : apiVersions) { tempApiIdToApiVersion.put(apiVersion.apiKey, apiVersion); } return tempApiIdToApiVersion; http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java index e0b3860..dfd68e7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java @@ -44,7 +44,7 @@ public class ControlledShutdownResponse extends AbstractResponse { private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V0 = new Schema( ERROR_CODE, - new Field("partitions_remaining", new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V0), "The partitions " + + new Field(PARTITIONS_REMAINING_KEY_NAME, new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V0), "The partitions " + "that the broker still leads.")); private static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = CONTROLLED_SHUTDOWN_RESPONSE_V0; http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java index aeea1cd..5bfdec8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java @@ -62,7 +62,7 @@ public class DeleteRecordsResponse extends AbstractResponse { private static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema( THROTTLE_TIME_MS, - new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0))); + new Field(TOPICS_KEY_NAME, new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0))); public static Schema[] schemaVersions() { return new Schema[]{DELETE_RECORDS_RESPONSE_V0}; @@ -164,4 +164,4 @@ public class DeleteRecordsResponse extends AbstractResponse { public static DeleteRecordsResponse parse(ByteBuffer buffer, short version) { return new DeleteRecordsResponse(ApiKeys.DELETE_RECORDS.responseSchema(version).read(buffer)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java index 0169da5..5f35c43 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsRequest.java @@ -45,9 +45,9 @@ public class DescribeLogDirsRequest extends AbstractRequest { private static final String PARTITIONS_KEY_NAME = "partitions"; private static final Schema DESCRIBE_LOG_DIRS_REQUEST_V0 = new Schema( - new Field("topics", ArrayOf.nullable(new Schema( + new Field(TOPICS_KEY_NAME, ArrayOf.nullable(new Schema( TOPIC_NAME, - new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic."))))); + new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32), "List of partition ids of the topic."))))); public static Schema[] schemaVersions() { return new Schema[]{DESCRIBE_LOG_DIRS_REQUEST_V0}; http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java index e35056e..dc226d8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java @@ -62,18 +62,18 @@ public class DescribeLogDirsResponse extends AbstractResponse { private static final Schema DESCRIBE_LOG_DIRS_RESPONSE_V0 = new Schema( THROTTLE_TIME_MS, - new Field("log_dirs", new ArrayOf(new Schema( + new Field(LOG_DIRS_KEY_NAME, new ArrayOf(new Schema( ERROR_CODE, - new Field("log_dir", STRING, "The absolute log directory path."), - new Field("topics", new ArrayOf(new Schema( + new Field(LOG_DIR_KEY_NAME, STRING, "The absolute log directory path."), + new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( TOPIC_NAME, - new Field("partitions", new ArrayOf(new Schema( + new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema( PARTITION_ID, - new Field("size", INT64, "The size of the log segments of the partition in bytes."), - new Field("offset_lag", INT64, "The lag of the log's LEO w.r.t. partition's HW " + + new Field(SIZE_KEY_NAME, INT64, "The size of the log segments of the partition in bytes."), + new Field(OFFSET_LAG_KEY_NAME, INT64, "The lag of the log's LEO w.r.t. partition's HW " + "(if it is the current log for the partition) or current replica's LEO " + "(if it is the future log for the partition)"), - new Field("is_future", BOOLEAN, "True if this log is created by " + + new Field(IS_FUTURE_KEY_NAME, BOOLEAN, "True if this log is created by " + "AlterReplicaDirRequest and will replace the current log of the replica " + "in the future."))))))))))); @@ -211,4 +211,4 @@ public class DescribeLogDirsResponse extends AbstractResponse { return builder.toString(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java index 4a60c94..3fea26c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -60,7 +60,7 @@ public class FetchRequest extends AbstractRequest { new Field(FETCH_OFFSET_KEY_NAME, INT64, "Message offset."), new Field(MAX_BYTES_KEY_NAME, INT32, "Maximum bytes to fetch.")); - // FETCH_REQUEST_PARTITION_V1 added log_start_offset field - the earliest available offset of partition data that can be consumed. + // FETCH_REQUEST_PARTITION_V5 added log_start_offset field - the earliest available offset of partition data that can be consumed. private static final Schema FETCH_REQUEST_PARTITION_V5 = new Schema( PARTITION_ID, new Field(FETCH_OFFSET_KEY_NAME, INT64, "Message offset."), http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index 417e845..f8d3090 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -84,8 +84,10 @@ public class FetchResponse extends AbstractResponse { new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); // Even though fetch response v2 has the same protocol as v1, the record set in the response is different. In v1, // record set only includes messages of v0 (magic byte 0). In v2, record set can include messages of v0 and v1 - // (magic byte 0 and 1). For details, see ByteBufferMessageSet. + // (magic byte 0 and 1). For details, see Records, RecordBatch and Record. private static final Schema FETCH_RESPONSE_V2 = FETCH_RESPONSE_V1; + + // The partition ordering is now relevant - partitions will be processed in order they appear in request. private static final Schema FETCH_RESPONSE_V3 = FETCH_RESPONSE_V2; // The v4 Fetch Response adds features for transactional consumption (the aborted transaction list and the http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index 73f037f..27aaf0a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -93,7 +93,8 @@ public class LeaderAndIsrRequest extends AbstractRequest { new Field(PARTITION_STATES_KEY_NAME, new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0)), new Field(LIVE_LEADERS_KEY_NAME, new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0))); - // LEADER_AND_ISR_REQUEST_V1 added a per-partition is_new Field. This field specifies whether the replica should have existed on the broker or not. + // LEADER_AND_ISR_REQUEST_V1 added a per-partition is_new Field. This field specifies whether the replica should + // have existed on the broker or not. private static final Schema LEADER_AND_ISR_REQUEST_V1 = new Schema( new Field(CONTROLLER_ID_KEY_NAME, INT32, "The controller id."), new Field(CONTROLLER_EPOCH_KEY_NAME, INT32, "The controller epoch."), http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java index cdf4c59..8f48f39 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java @@ -38,15 +38,15 @@ public class ListGroupsResponse extends AbstractResponse { private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type"; private static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema( - new Field("group_id", STRING), - new Field("protocol_type", STRING)); + new Field(GROUP_ID_KEY_NAME, STRING), + new Field(PROTOCOL_TYPE_KEY_NAME, STRING)); private static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema( ERROR_CODE, - new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0))); + new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0))); private static final Schema LIST_GROUPS_RESPONSE_V1 = new Schema( THROTTLE_TIME_MS, ERROR_CODE, - new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0))); + new Field(GROUPS_KEY_NAME, new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0))); public static Schema[] schemaVersions() { return new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1}; http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index 0181eef..13484ed 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -85,7 +85,6 @@ public class OffsetCommitResponse extends AbstractResponse { OFFSET_COMMIT_RESPONSE_V3}; } - private final Map<TopicPartition, Errors> responseData; private final int throttleTimeMs; http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java index b5fce78..d0585be 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java @@ -43,7 +43,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest { /* Offsets for Leader Epoch api */ private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_PARTITION_V0 = new Schema( PARTITION_ID, - new Field("leader_epoch", INT32, "The epoch")); + new Field(LEADER_EPOCH, INT32, "The epoch")); private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_TOPIC_V0 = new Schema( TOPIC_NAME, new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_REQUEST_PARTITION_V0))); http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java index 1284e7e..956d813 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java @@ -157,7 +157,6 @@ public class RequestHeader extends AbstractRequestResponse { return result; } - private static Schema schema(short apiKey, short version) { if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN.id && version == 0) // This will be removed once we remove support for v0 of ControlledShutdownRequest, which http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index 67ae8e1..6c36bda 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -126,7 +126,8 @@ public class UpdateMetadataRequest extends AbstractRequest { private static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V3 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V2; - // UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 added a per-partition offline_replicas field. This field specifies the list of replicas that are offline. + // UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 added a per-partition offline_replicas field. This field specifies + // the list of replicas that are offline. private static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V4 = new Schema( TOPIC_NAME, PARTITION_ID, http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java index 96dfb2f..3f7a0c9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java @@ -40,7 +40,7 @@ import static org.apache.kafka.common.protocol.types.Type.INT64; public class WriteTxnMarkersRequest extends AbstractRequest { private static final String COORDINATOR_EPOCH_KEY_NAME = "coordinator_epoch"; - private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers"; + private static final String TXN_MARKERS_KEY_NAME = "transaction_markers"; private static final String PRODUCER_ID_KEY_NAME = "producer_id"; private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch"; @@ -60,7 +60,8 @@ public class WriteTxnMarkersRequest extends AbstractRequest { "hosted by this transaction coordinator")); private static final Schema WRITE_TXN_MARKERS_REQUEST_V0 = new Schema( - new Field(TXN_MARKER_ENTRY_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0), "The transaction markers to be written.")); + new Field(TXN_MARKERS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0), "The transaction markers to " + + "be written.")); public static Schema[] schemaVersions() { return new Schema[]{WRITE_TXN_MARKERS_REQUEST_V0}; @@ -160,7 +161,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest { public WriteTxnMarkersRequest(Struct struct, short version) { super(version); List<TxnMarkerEntry> markers = new ArrayList<>(); - Object[] markersArray = struct.getArray(TXN_MARKER_ENTRY_KEY_NAME); + Object[] markersArray = struct.getArray(TXN_MARKERS_KEY_NAME); for (Object markerObj : markersArray) { Struct markerStruct = (Struct) markerObj; @@ -197,7 +198,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest { Object[] markersArray = new Object[markers.size()]; int i = 0; for (TxnMarkerEntry entry : markers) { - Struct markerStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME); + Struct markerStruct = struct.instance(TXN_MARKERS_KEY_NAME); markerStruct.set(PRODUCER_ID_KEY_NAME, entry.producerId); markerStruct.set(PRODUCER_EPOCH_KEY_NAME, entry.producerEpoch); markerStruct.set(COORDINATOR_EPOCH_KEY_NAME, entry.coordinatorEpoch); @@ -215,7 +216,7 @@ public class WriteTxnMarkersRequest extends AbstractRequest { markerStruct.set(TOPICS_KEY_NAME, partitionsArray); markersArray[i++] = markerStruct; } - struct.set(TXN_MARKER_ENTRY_KEY_NAME, markersArray); + struct.set(TXN_MARKERS_KEY_NAME, markersArray); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/92c06cba/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java index 3372670..797fb59 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java @@ -35,7 +35,7 @@ import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; import static org.apache.kafka.common.protocol.types.Type.INT64; public class WriteTxnMarkersResponse extends AbstractResponse { - private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers"; + private static final String TXN_MARKERS_KEY_NAME = "transaction_markers"; private static final String PRODUCER_ID_KEY_NAME = "producer_id"; private static final String TOPICS_KEY_NAME = "topics"; @@ -45,7 +45,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse { PARTITION_ID, ERROR_CODE); - private static final Schema WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0 = new Schema( + private static final Schema WRITE_TXN_MARKERS_ENTRY_V0 = new Schema( new Field(PRODUCER_ID_KEY_NAME, INT64, "Current producer id in use by the transactional id."), new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( TOPIC_NAME, @@ -53,7 +53,8 @@ public class WriteTxnMarkersResponse extends AbstractResponse { "Errors per partition from writing markers.")); private static final Schema WRITE_TXN_MARKERS_RESPONSE_V0 = new Schema( - new Field("transaction_markers", new ArrayOf(WRITE_TXN_MARKERS_ENTRY_RESPONSE_V0), "Errors per partition from writing markers.")); + new Field(TXN_MARKERS_KEY_NAME, new ArrayOf(WRITE_TXN_MARKERS_ENTRY_V0), "Errors per partition from " + + "writing markers.")); public static Schema[] schemaVersions() { return new Schema[]{WRITE_TXN_MARKERS_RESPONSE_V0}; @@ -82,7 +83,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse { public WriteTxnMarkersResponse(Struct struct) { Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>(); - Object[] responseArray = struct.getArray(TXN_MARKER_ENTRY_KEY_NAME); + Object[] responseArray = struct.getArray(TXN_MARKERS_KEY_NAME); for (Object responseObj : responseArray) { Struct responseStruct = (Struct) responseObj; @@ -113,7 +114,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse { Object[] responsesArray = new Object[errors.size()]; int k = 0; for (Map.Entry<Long, Map<TopicPartition, Errors>> responseEntry : errors.entrySet()) { - Struct responseStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME); + Struct responseStruct = struct.instance(TXN_MARKERS_KEY_NAME); responseStruct.set(PRODUCER_ID_KEY_NAME, responseEntry.getKey()); Map<TopicPartition, Errors> partitionAndErrors = responseEntry.getValue(); @@ -141,7 +142,7 @@ public class WriteTxnMarkersResponse extends AbstractResponse { responsesArray[k++] = responseStruct; } - struct.set(TXN_MARKER_ENTRY_KEY_NAME, responsesArray); + struct.set(TXN_MARKERS_KEY_NAME, responsesArray); return struct; }