http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 9f8e981..36094b0 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -20,7 +20,12 @@ import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; -import static org.apache.kafka.common.protocol.types.Type.*; +import static org.apache.kafka.common.protocol.types.Type.BYTES; +import static org.apache.kafka.common.protocol.types.Type.INT16; +import static org.apache.kafka.common.protocol.types.Type.INT32; +import static org.apache.kafka.common.protocol.types.Type.INT64; +import static org.apache.kafka.common.protocol.types.Type.INT8; +import static org.apache.kafka.common.protocol.types.Type.STRING; public class Protocol { @@ -180,31 +185,31 @@ public class Protocol { public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id", STRING, - "The consumer group id."), + "The group id."), new Field("topics", new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0), "Topics to commit offsets.")); public static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id", STRING, - "The consumer group id."), + "The group id."), new Field("group_generation_id", INT32, - "The generation of the consumer group."), - new Field("consumer_id", + "The generation of the group."), + new Field("member_id", STRING, - "The consumer id assigned by the group coordinator."), + "The member id assigned by the group coordinator."), new Field("topics", new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1), "Topics to commit offsets.")); public static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(new Field("group_id", STRING, - "The consumer group id."), + "The group id."), new Field("group_generation_id", INT32, "The generation of the consumer group."), - new Field("consumer_id", + new Field("member_id", STRING, "The consumer id assigned by the group coordinator."), new Field("retention_time", @@ -384,17 +389,17 @@ public class Protocol { public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1}; /* Consumer metadata api */ - public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id", - STRING, - "The consumer group id.")); + public static final Schema GROUP_METADATA_REQUEST_V0 = new Schema(new Field("group_id", + STRING, + "The unique group id.")); - public static final Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16), - new Field("coordinator", - BROKER, - "Host and port information for the coordinator for a consumer group.")); + public static final Schema GROUP_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16), + new Field("coordinator", + BROKER, + "Host and port information for the coordinator for a consumer group.")); - public static final Schema[] CONSUMER_METADATA_REQUEST = new Schema[] {CONSUMER_METADATA_REQUEST_V0}; - public static final Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] {CONSUMER_METADATA_RESPONSE_V0}; + public static final Schema[] GROUP_METADATA_REQUEST = new Schema[] {GROUP_METADATA_REQUEST_V0}; + public static final Schema[] GROUP_METADATA_RESPONSE = new Schema[] {GROUP_METADATA_RESPONSE_V0}; /* Controlled shutdown api */ public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id", @@ -416,45 +421,67 @@ public class Protocol { public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = new Schema[] {null, CONTROLLED_SHUTDOWN_RESPONSE_V1}; /* Join group api */ + public static final Schema JOIN_GROUP_REQUEST_PROTOCOL_V0 = new Schema(new Field("protocol_name", STRING), + new Field("protocol_metadata", BYTES)); + public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, - "The consumer group id."), + "The group id."), new Field("session_timeout", INT32, "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."), - new Field("topics", - new ArrayOf(STRING), - "An array of topics to subscribe to."), - new Field("consumer_id", + new Field("member_id", STRING, "The assigned consumer id or an empty string for a new consumer."), - new Field("partition_assignment_strategy", + new Field("protocol_type", STRING, - "The strategy for the coordinator to assign partitions.")); + "Unique name for class of protocols implemented by group"), + new Field("group_protocols", + new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), + "List of protocols that the member supports")); + - public static final Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING), - new Field("partitions", new ArrayOf(INT32))); + public static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id", STRING), + new Field("member_metadata", BYTES)); public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16), - new Field("group_generation_id", + new Field("generation_id", INT32, "The generation of the consumer group."), - new Field("consumer_id", + new Field("group_protocol", + STRING, + "The group protocol selected by the coordinator"), + new Field("leader_id", + STRING, + "The leader of the group"), + new Field("member_id", STRING, "The consumer id assigned by the group coordinator."), - new Field("assigned_partitions", - new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0))); + new Field("members", + new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0))); public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0}; public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0}; + /* SyncGroup api */ + public static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(new Field("member_id", STRING), + new Field("member_assignment", BYTES)); + public static final Schema SYNC_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING), + new Field("generation_id", INT32), + new Field("member_id", STRING), + new Field("group_assignment", new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0))); + public static final Schema SYNC_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16), + new Field("member_assignment", BYTES)); + public static final Schema[] SYNC_GROUP_REQUEST = new Schema[] {SYNC_GROUP_REQUEST_V0}; + public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0}; + /* Heartbeat api */ public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."), new Field("group_generation_id", INT32, "The generation of the consumer group."), - new Field("consumer_id", + new Field("member_id", STRING, - "The consumer id assigned by the group coordinator.")); + "The member id assigned by the group coordinator.")); public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", INT16)); @@ -589,10 +616,11 @@ public class Protocol { REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_REQUEST; REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST; REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST; - REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST; + REQUESTS[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_REQUEST; REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST; REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST; REQUESTS[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_REQUEST; + REQUESTS[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_REQUEST; RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE; @@ -605,10 +633,11 @@ public class Protocol { RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_RESPONSE; RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE; RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE; - RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE; + RESPONSES[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_RESPONSE; RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE; RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE; RESPONSES[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_RESPONSE; + RESPONSES[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_RESPONSE; /* set the maximum version of each api */ for (ApiKeys api : ApiKeys.values())
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 095cd52..03e77a5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -49,14 +49,16 @@ public abstract class AbstractRequest extends AbstractRequestResponse { return OffsetCommitRequest.parse(buffer, versionId); case OFFSET_FETCH: return OffsetFetchRequest.parse(buffer, versionId); - case CONSUMER_METADATA: - return ConsumerMetadataRequest.parse(buffer, versionId); + case GROUP_METADATA: + return GroupMetadataRequest.parse(buffer, versionId); case JOIN_GROUP: return JoinGroupRequest.parse(buffer, versionId); case HEARTBEAT: return HeartbeatRequest.parse(buffer, versionId); case LEAVE_GROUP: return LeaveGroupRequest.parse(buffer, versionId); + case SYNC_GROUP: + return SyncGroupRequest.parse(buffer, versionId); case STOP_REPLICA: return StopReplicaRequest.parse(buffer, versionId); case CONTROLLED_SHUTDOWN_KEY: http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java deleted file mode 100644 index 5b3e04a..0000000 --- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.common.requests; - -import org.apache.kafka.common.Node; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; - -import java.nio.ByteBuffer; - -public class ConsumerMetadataRequest extends AbstractRequest { - - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id); - private static final String GROUP_ID_KEY_NAME = "group_id"; - - private final String groupId; - - public ConsumerMetadataRequest(String groupId) { - super(new Struct(CURRENT_SCHEMA)); - - struct.set(GROUP_ID_KEY_NAME, groupId); - this.groupId = groupId; - } - - public ConsumerMetadataRequest(Struct struct) { - super(struct); - groupId = struct.getString(GROUP_ID_KEY_NAME); - } - - @Override - public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { - switch (versionId) { - case 0: - return new ConsumerMetadataResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode()); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.CONSUMER_METADATA.id))); - } - } - - public String groupId() { - return groupId; - } - - public static ConsumerMetadataRequest parse(ByteBuffer buffer, int versionId) { - return new ConsumerMetadataRequest(ProtoUtils.parseRequest(ApiKeys.CONSUMER_METADATA.id, versionId, buffer)); - } - - public static ConsumerMetadataRequest parse(ByteBuffer buffer) { - return new ConsumerMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer)); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java deleted file mode 100644 index 0c250c3..0000000 --- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.common.requests; - -import org.apache.kafka.common.Node; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ProtoUtils; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; - -import java.nio.ByteBuffer; - -public class ConsumerMetadataResponse extends AbstractRequestResponse { - - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CONSUMER_METADATA.id); - private static final String ERROR_CODE_KEY_NAME = "error_code"; - private static final String COORDINATOR_KEY_NAME = "coordinator"; - - // coordinator level field names - private static final String NODE_ID_KEY_NAME = "node_id"; - private static final String HOST_KEY_NAME = "host"; - private static final String PORT_KEY_NAME = "port"; - - private final short errorCode; - private final Node node; - - public ConsumerMetadataResponse(short errorCode, Node node) { - super(new Struct(CURRENT_SCHEMA)); - struct.set(ERROR_CODE_KEY_NAME, errorCode); - Struct coordinator = struct.instance(COORDINATOR_KEY_NAME); - coordinator.set(NODE_ID_KEY_NAME, node.id()); - coordinator.set(HOST_KEY_NAME, node.host()); - coordinator.set(PORT_KEY_NAME, node.port()); - struct.set(COORDINATOR_KEY_NAME, coordinator); - this.errorCode = errorCode; - this.node = node; - } - - public ConsumerMetadataResponse(Struct struct) { - super(struct); - errorCode = struct.getShort(ERROR_CODE_KEY_NAME); - Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME); - int nodeId = broker.getInt(NODE_ID_KEY_NAME); - String host = broker.getString(HOST_KEY_NAME); - int port = broker.getInt(PORT_KEY_NAME); - node = new Node(nodeId, host, port); - } - - public short errorCode() { - return errorCode; - } - - public Node node() { - return node; - } - - public static ConsumerMetadataResponse parse(ByteBuffer buffer) { - return new ConsumerMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java new file mode 100644 index 0000000..fd54c5a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class GroupMetadataRequest extends AbstractRequest { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.GROUP_METADATA.id); + private static final String GROUP_ID_KEY_NAME = "group_id"; + + private final String groupId; + + public GroupMetadataRequest(String groupId) { + super(new Struct(CURRENT_SCHEMA)); + + struct.set(GROUP_ID_KEY_NAME, groupId); + this.groupId = groupId; + } + + public GroupMetadataRequest(Struct struct) { + super(struct); + groupId = struct.getString(GROUP_ID_KEY_NAME); + } + + @Override + public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { + switch (versionId) { + case 0: + return new GroupMetadataResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode()); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.GROUP_METADATA.id))); + } + } + + public String groupId() { + return groupId; + } + + public static GroupMetadataRequest parse(ByteBuffer buffer, int versionId) { + return new GroupMetadataRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_METADATA.id, versionId, buffer)); + } + + public static GroupMetadataRequest parse(ByteBuffer buffer) { + return new GroupMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer)); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java new file mode 100644 index 0000000..a5ef478 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class GroupMetadataResponse extends AbstractRequestResponse { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_METADATA.id); + private static final String ERROR_CODE_KEY_NAME = "error_code"; + private static final String COORDINATOR_KEY_NAME = "coordinator"; + + // coordinator level field names + private static final String NODE_ID_KEY_NAME = "node_id"; + private static final String HOST_KEY_NAME = "host"; + private static final String PORT_KEY_NAME = "port"; + + private final short errorCode; + private final Node node; + + public GroupMetadataResponse(short errorCode, Node node) { + super(new Struct(CURRENT_SCHEMA)); + struct.set(ERROR_CODE_KEY_NAME, errorCode); + Struct coordinator = struct.instance(COORDINATOR_KEY_NAME); + coordinator.set(NODE_ID_KEY_NAME, node.id()); + coordinator.set(HOST_KEY_NAME, node.host()); + coordinator.set(PORT_KEY_NAME, node.port()); + struct.set(COORDINATOR_KEY_NAME, coordinator); + this.errorCode = errorCode; + this.node = node; + } + + public GroupMetadataResponse(Struct struct) { + super(struct); + errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME); + int nodeId = broker.getInt(NODE_ID_KEY_NAME); + String host = broker.getString(HOST_KEY_NAME); + int port = broker.getInt(PORT_KEY_NAME); + node = new Node(nodeId, host, port); + } + + public short errorCode() { + return errorCode; + } + + public Node node() { + return node; + } + + public static GroupMetadataResponse parse(ByteBuffer buffer) { + return new GroupMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java index 89719f1..74be3ed 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java @@ -25,27 +25,27 @@ public class HeartbeatRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id); private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id"; - private static final String CONSUMER_ID_KEY_NAME = "consumer_id"; + private static final String MEMBER_ID_KEY_NAME = "member_id"; private final String groupId; private final int groupGenerationId; - private final String consumerId; + private final String memberId; - public HeartbeatRequest(String groupId, int groupGenerationId, String consumerId) { + public HeartbeatRequest(String groupId, int groupGenerationId, String memberId) { super(new Struct(CURRENT_SCHEMA)); struct.set(GROUP_ID_KEY_NAME, groupId); struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId); - struct.set(CONSUMER_ID_KEY_NAME, consumerId); + struct.set(MEMBER_ID_KEY_NAME, memberId); this.groupId = groupId; this.groupGenerationId = groupGenerationId; - this.consumerId = consumerId; + this.memberId = memberId; } public HeartbeatRequest(Struct struct) { super(struct); groupId = struct.getString(GROUP_ID_KEY_NAME); groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME); - consumerId = struct.getString(CONSUMER_ID_KEY_NAME); + memberId = struct.getString(MEMBER_ID_KEY_NAME); } @Override @@ -67,8 +67,8 @@ public class HeartbeatRequest extends AbstractRequest { return groupGenerationId; } - public String consumerId() { - return consumerId; + public String memberId() { + return memberId; } public static HeartbeatRequest parse(ByteBuffer buffer, int versionId) { http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java index 96e6ab0..48cb4c0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java @@ -27,10 +27,10 @@ public class HeartbeatResponse extends AbstractRequestResponse { /** * Possible error code: * - * CONSUMER_COORDINATOR_NOT_AVAILABLE (15) - * NOT_COORDINATOR_FOR_CONSUMER (16) + * GROUP_COORDINATOR_NOT_AVAILABLE (15) + * NOT_COORDINATOR_FOR_GROUP (16) * ILLEGAL_GENERATION (22) - * UNKNOWN_CONSUMER_ID (25) + * UNKNOWN_MEMBER_ID (25) */ private final short errorCode; http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java index 1ffe076..91a698c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java @@ -12,7 +12,6 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; @@ -29,42 +28,79 @@ public class JoinGroupRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id); private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout"; - private static final String TOPICS_KEY_NAME = "topics"; - private static final String CONSUMER_ID_KEY_NAME = "consumer_id"; - private static final String STRATEGY_KEY_NAME = "partition_assignment_strategy"; + private static final String MEMBER_ID_KEY_NAME = "member_id"; + private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type"; + private static final String GROUP_PROTOCOLS_KEY_NAME = "group_protocols"; + private static final String PROTOCOL_NAME_KEY_NAME = "protocol_name"; + private static final String PROTOCOL_METADATA_KEY_NAME = "protocol_metadata"; - public static final String UNKNOWN_CONSUMER_ID = ""; + public static final String UNKNOWN_MEMBER_ID = ""; private final String groupId; private final int sessionTimeout; - private final List<String> topics; - private final String consumerId; - private final String strategy; + private final String memberId; + private final String protocolType; + private final List<GroupProtocol> groupProtocols; - public JoinGroupRequest(String groupId, int sessionTimeout, List<String> topics, String consumerId, String strategy) { + public static class GroupProtocol { + private final String name; + private final ByteBuffer metadata; + + public GroupProtocol(String name, ByteBuffer metadata) { + this.name = name; + this.metadata = metadata; + } + + public String name() { + return name; + } + + public ByteBuffer metadata() { + return metadata; + } + } + + public JoinGroupRequest(String groupId, + int sessionTimeout, + String memberId, + String protocolType, + List<GroupProtocol> groupProtocols) { super(new Struct(CURRENT_SCHEMA)); struct.set(GROUP_ID_KEY_NAME, groupId); struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout); - struct.set(TOPICS_KEY_NAME, topics.toArray()); - struct.set(CONSUMER_ID_KEY_NAME, consumerId); - struct.set(STRATEGY_KEY_NAME, strategy); + struct.set(MEMBER_ID_KEY_NAME, memberId); + struct.set(PROTOCOL_TYPE_KEY_NAME, protocolType); + + List<Struct> groupProtocolsList = new ArrayList<>(); + for (GroupProtocol protocol : groupProtocols) { + Struct protocolStruct = struct.instance(GROUP_PROTOCOLS_KEY_NAME); + protocolStruct.set(PROTOCOL_NAME_KEY_NAME, protocol.name); + protocolStruct.set(PROTOCOL_METADATA_KEY_NAME, protocol.metadata); + groupProtocolsList.add(protocolStruct); + } + + struct.set(GROUP_PROTOCOLS_KEY_NAME, groupProtocolsList.toArray()); this.groupId = groupId; this.sessionTimeout = sessionTimeout; - this.topics = topics; - this.consumerId = consumerId; - this.strategy = strategy; + this.memberId = memberId; + this.protocolType = protocolType; + this.groupProtocols = groupProtocols; } public JoinGroupRequest(Struct struct) { super(struct); groupId = struct.getString(GROUP_ID_KEY_NAME); sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME); - Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME); - topics = new ArrayList<String>(); - for (Object topic: topicsArray) - topics.add((String) topic); - consumerId = struct.getString(CONSUMER_ID_KEY_NAME); - strategy = struct.getString(STRATEGY_KEY_NAME); + memberId = struct.getString(MEMBER_ID_KEY_NAME); + protocolType = struct.getString(PROTOCOL_TYPE_KEY_NAME); + + groupProtocols = new ArrayList<>(); + for (Object groupProtocolObj : struct.getArray(GROUP_PROTOCOLS_KEY_NAME)) { + Struct groupProtocolStruct = (Struct) groupProtocolObj; + String name = groupProtocolStruct.getString(PROTOCOL_NAME_KEY_NAME); + ByteBuffer metadata = groupProtocolStruct.getBytes(PROTOCOL_METADATA_KEY_NAME); + groupProtocols.add(new GroupProtocol(name, metadata)); + } } @Override @@ -74,8 +110,10 @@ public class JoinGroupRequest extends AbstractRequest { return new JoinGroupResponse( Errors.forException(e).code(), JoinGroupResponse.UNKNOWN_GENERATION_ID, - JoinGroupResponse.UNKNOWN_CONSUMER_ID, - Collections.<TopicPartition>emptyList()); + JoinGroupResponse.UNKNOWN_PROTOCOL, + JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId + JoinGroupResponse.UNKNOWN_MEMBER_ID, // leaderId + Collections.<String, ByteBuffer>emptyMap()); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id))); @@ -90,16 +128,16 @@ public class JoinGroupRequest extends AbstractRequest { return sessionTimeout; } - public List<String> topics() { - return topics; + public String memberId() { + return memberId; } - public String consumerId() { - return consumerId; + public List<GroupProtocol> groupProtocols() { + return groupProtocols; } - public String strategy() { - return strategy; + public String protocolType() { + return protocolType; } public static JoinGroupRequest parse(ByteBuffer buffer, int versionId) { http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index 7bf544e..c65a4bb 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -12,15 +12,16 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class JoinGroupResponse extends AbstractRequestResponse { @@ -30,63 +31,78 @@ public class JoinGroupResponse extends AbstractRequestResponse { /** * Possible error code: * - * CONSUMER_COORDINATOR_NOT_AVAILABLE (15) - * NOT_COORDINATOR_FOR_CONSUMER (16) - * INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY (23) - * UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY (24) - * UNKNOWN_CONSUMER_ID (25) + * GROUP_COORDINATOR_NOT_AVAILABLE (15) + * NOT_COORDINATOR_FOR_GROUP (16) + * INCONSISTENT_GROUP_PROTOCOL (23) + * UNKNOWN_MEMBER_ID (25) * INVALID_SESSION_TIMEOUT (26) */ - private static final String GENERATION_ID_KEY_NAME = "group_generation_id"; - private static final String CONSUMER_ID_KEY_NAME = "consumer_id"; - private static final String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions"; - private static final String TOPIC_KEY_NAME = "topic"; - private static final String PARTITIONS_KEY_NAME = "partitions"; + private static final String GENERATION_ID_KEY_NAME = "generation_id"; + private static final String GROUP_PROTOCOL_KEY_NAME = "group_protocol"; + private static final String LEADER_ID_KEY_NAME = "leader_id"; + private static final String MEMBER_ID_KEY_NAME = "member_id"; + private static final String MEMBERS_KEY_NAME = "members"; + private static final String MEMBER_METADATA_KEY_NAME = "member_metadata"; + + public static final String UNKNOWN_PROTOCOL = ""; public static final int UNKNOWN_GENERATION_ID = -1; - public static final String UNKNOWN_CONSUMER_ID = ""; + public static final String UNKNOWN_MEMBER_ID = ""; private final short errorCode; private final int generationId; - private final String consumerId; - private final List<TopicPartition> assignedPartitions; - - public JoinGroupResponse(short errorCode, int generationId, String consumerId, List<TopicPartition> assignedPartitions) { + private final String groupProtocol; + private final String memberId; + private final String leaderId; + private final Map<String, ByteBuffer> members; + + public JoinGroupResponse(short errorCode, + int generationId, + String groupProtocol, + String memberId, + String leaderId, + Map<String, ByteBuffer> groupMembers) { super(new Struct(CURRENT_SCHEMA)); - Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupDataByTopic(assignedPartitions); - struct.set(ERROR_CODE_KEY_NAME, errorCode); struct.set(GENERATION_ID_KEY_NAME, generationId); - struct.set(CONSUMER_ID_KEY_NAME, consumerId); - List<Struct> topicArray = new ArrayList<Struct>(); - for (Map.Entry<String, List<Integer>> entries: partitionsByTopic.entrySet()) { - Struct topicData = struct.instance(ASSIGNED_PARTITIONS_KEY_NAME); - topicData.set(TOPIC_KEY_NAME, entries.getKey()); - topicData.set(PARTITIONS_KEY_NAME, entries.getValue().toArray()); - topicArray.add(topicData); + struct.set(GROUP_PROTOCOL_KEY_NAME, groupProtocol); + struct.set(MEMBER_ID_KEY_NAME, memberId); + struct.set(LEADER_ID_KEY_NAME, leaderId); + + List<Struct> memberArray = new ArrayList<>(); + for (Map.Entry<String, ByteBuffer> entries: groupMembers.entrySet()) { + Struct memberData = struct.instance(MEMBERS_KEY_NAME); + memberData.set(MEMBER_ID_KEY_NAME, entries.getKey()); + memberData.set(MEMBER_METADATA_KEY_NAME, entries.getValue()); + memberArray.add(memberData); } - struct.set(ASSIGNED_PARTITIONS_KEY_NAME, topicArray.toArray()); + struct.set(MEMBERS_KEY_NAME, memberArray.toArray()); this.errorCode = errorCode; this.generationId = generationId; - this.consumerId = consumerId; - this.assignedPartitions = assignedPartitions; + this.groupProtocol = groupProtocol; + this.memberId = memberId; + this.leaderId = leaderId; + this.members = groupMembers; } public JoinGroupResponse(Struct struct) { super(struct); - assignedPartitions = new ArrayList<TopicPartition>(); - for (Object topicDataObj : struct.getArray(ASSIGNED_PARTITIONS_KEY_NAME)) { - Struct topicData = (Struct) topicDataObj; - String topic = topicData.getString(TOPIC_KEY_NAME); - for (Object partitionObj : topicData.getArray(PARTITIONS_KEY_NAME)) - assignedPartitions.add(new TopicPartition(topic, (Integer) partitionObj)); + members = new HashMap<>(); + + for (Object memberDataObj : struct.getArray(MEMBERS_KEY_NAME)) { + Struct memberData = (Struct) memberDataObj; + String memberId = memberData.getString(MEMBER_ID_KEY_NAME); + ByteBuffer memberMetadata = memberData.getBytes(MEMBER_METADATA_KEY_NAME); + members.put(memberId, memberMetadata); } errorCode = struct.getShort(ERROR_CODE_KEY_NAME); generationId = struct.getInt(GENERATION_ID_KEY_NAME); - consumerId = struct.getString(CONSUMER_ID_KEY_NAME); + groupProtocol = struct.getString(GROUP_PROTOCOL_KEY_NAME); + memberId = struct.getString(MEMBER_ID_KEY_NAME); + leaderId = struct.getString(LEADER_ID_KEY_NAME); } public short errorCode() { @@ -97,12 +113,24 @@ public class JoinGroupResponse extends AbstractRequestResponse { return generationId; } - public String consumerId() { - return consumerId; + public String groupProtocol() { + return groupProtocol; + } + + public String memberId() { + return memberId; + } + + public String leaderId() { + return leaderId; + } + + public boolean isLeader() { + return memberId.equals(leaderId); } - public List<TopicPartition> assignedPartitions() { - return assignedPartitions; + public Map<String, ByteBuffer> members() { + return members; } public static JoinGroupResponse parse(ByteBuffer buffer) { http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 03df1e7..8721efa 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -34,7 +34,7 @@ public class OffsetCommitRequest extends AbstractRequest { private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id); private static final String GROUP_ID_KEY_NAME = "group_id"; private static final String GENERATION_ID_KEY_NAME = "group_generation_id"; - private static final String CONSUMER_ID_KEY_NAME = "consumer_id"; + private static final String MEMBER_ID_KEY_NAME = "member_id"; private static final String TOPICS_KEY_NAME = "topics"; private static final String RETENTION_TIME_KEY_NAME = "retention_time"; @@ -52,7 +52,7 @@ public class OffsetCommitRequest extends AbstractRequest { // default values for the current version public static final int DEFAULT_GENERATION_ID = -1; - public static final String DEFAULT_CONSUMER_ID = ""; + public static final String DEFAULT_MEMBER_ID = ""; public static final long DEFAULT_RETENTION_TIME = -1L; // default values for old versions, @@ -61,7 +61,7 @@ public class OffsetCommitRequest extends AbstractRequest { public static final long DEFAULT_TIMESTAMP = -1L; // for V0, V1 private final String groupId; - private final String consumerId; + private final String memberId; private final int generationId; private final long retentionTime; private final Map<TopicPartition, PartitionData> offsetData; @@ -97,7 +97,7 @@ public class OffsetCommitRequest extends AbstractRequest { initCommonFields(groupId, offsetData); this.groupId = groupId; this.generationId = DEFAULT_GENERATION_ID; - this.consumerId = DEFAULT_CONSUMER_ID; + this.memberId = DEFAULT_MEMBER_ID; this.retentionTime = DEFAULT_RETENTION_TIME; this.offsetData = offsetData; } @@ -106,19 +106,19 @@ public class OffsetCommitRequest extends AbstractRequest { * Constructor for version 1. * @param groupId * @param generationId - * @param consumerId + * @param memberId * @param offsetData */ @Deprecated - public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) { + public OffsetCommitRequest(String groupId, int generationId, String memberId, Map<TopicPartition, PartitionData> offsetData) { super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 1))); initCommonFields(groupId, offsetData); struct.set(GENERATION_ID_KEY_NAME, generationId); - struct.set(CONSUMER_ID_KEY_NAME, consumerId); + struct.set(MEMBER_ID_KEY_NAME, memberId); this.groupId = groupId; this.generationId = generationId; - this.consumerId = consumerId; + this.memberId = memberId; this.retentionTime = DEFAULT_RETENTION_TIME; this.offsetData = offsetData; } @@ -127,20 +127,20 @@ public class OffsetCommitRequest extends AbstractRequest { * Constructor for version 2. * @param groupId * @param generationId - * @param consumerId + * @param memberId * @param retentionTime * @param offsetData */ - public OffsetCommitRequest(String groupId, int generationId, String consumerId, long retentionTime, Map<TopicPartition, PartitionData> offsetData) { + public OffsetCommitRequest(String groupId, int generationId, String memberId, long retentionTime, Map<TopicPartition, PartitionData> offsetData) { super(new Struct(CURRENT_SCHEMA)); initCommonFields(groupId, offsetData); struct.set(GENERATION_ID_KEY_NAME, generationId); - struct.set(CONSUMER_ID_KEY_NAME, consumerId); + struct.set(MEMBER_ID_KEY_NAME, memberId); struct.set(RETENTION_TIME_KEY_NAME, retentionTime); this.groupId = groupId; this.generationId = generationId; - this.consumerId = consumerId; + this.memberId = memberId; this.retentionTime = retentionTime; this.offsetData = offsetData; } @@ -183,10 +183,10 @@ public class OffsetCommitRequest extends AbstractRequest { generationId = DEFAULT_GENERATION_ID; // This field only exists in v1. - if (struct.hasField(CONSUMER_ID_KEY_NAME)) - consumerId = struct.getString(CONSUMER_ID_KEY_NAME); + if (struct.hasField(MEMBER_ID_KEY_NAME)) + memberId = struct.getString(MEMBER_ID_KEY_NAME); else - consumerId = DEFAULT_CONSUMER_ID; + memberId = DEFAULT_MEMBER_ID; // This field only exists in v2 if (struct.hasField(RETENTION_TIME_KEY_NAME)) @@ -243,8 +243,8 @@ public class OffsetCommitRequest extends AbstractRequest { return generationId; } - public String consumerId() { - return consumerId; + public String memberId() { + return memberId; } public long retentionTime() { http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index a163333..dae9c37 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -42,10 +42,10 @@ public class OffsetCommitResponse extends AbstractRequestResponse { * Possible error code: * * OFFSET_METADATA_TOO_LARGE (12) - * CONSUMER_COORDINATOR_NOT_AVAILABLE (15) - * NOT_COORDINATOR_FOR_CONSUMER (16) + * GROUP_COORDINATOR_NOT_AVAILABLE (15) + * NOT_COORDINATOR_FOR_GROUP (16) * ILLEGAL_GENERATION (22) - * UNKNOWN_CONSUMER_ID (25) + * UNKNOWN_MEMBER_ID (25) * COMMITTING_PARTITIONS_NOT_ASSIGNED (27) * INVALID_COMMIT_OFFSET_SIZE (28) */ http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index 3dc8521..09ac74a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -49,9 +49,9 @@ public class OffsetFetchResponse extends AbstractRequestResponse { * * UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v0 * OFFSET_LOAD_IN_PROGRESS (14) - * NOT_COORDINATOR_FOR_CONSUMER (16) + * NOT_COORDINATOR_FOR_GROUP (16) * ILLEGAL_GENERATION (22) - * UNKNOWN_CONSUMER_ID (25) + * UNKNOWN_MEMBER_ID (25) */ private final Map<TopicPartition, PartitionData> responseData; http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java new file mode 100644 index 0000000..606584b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SyncGroupRequest extends AbstractRequest { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.SYNC_GROUP.id); + public static final String GROUP_ID_KEY_NAME = "group_id"; + public static final String GENERATION_ID_KEY_NAME = "generation_id"; + public static final String MEMBER_ID_KEY_NAME = "member_id"; + public static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment"; + public static final String GROUP_ASSIGNMENT_KEY_NAME = "group_assignment"; + + private final String groupId; + private final int generationId; + private final String memberId; + private final Map<String, ByteBuffer> groupAssignment; + + public SyncGroupRequest(String groupId, + int generationId, + String memberId, + Map<String, ByteBuffer> groupAssignment) { + super(new Struct(CURRENT_SCHEMA)); + struct.set(GROUP_ID_KEY_NAME, groupId); + struct.set(GENERATION_ID_KEY_NAME, generationId); + struct.set(MEMBER_ID_KEY_NAME, memberId); + + List<Struct> memberArray = new ArrayList<>(); + for (Map.Entry<String, ByteBuffer> entries: groupAssignment.entrySet()) { + Struct memberData = struct.instance(GROUP_ASSIGNMENT_KEY_NAME); + memberData.set(MEMBER_ID_KEY_NAME, entries.getKey()); + memberData.set(MEMBER_ASSIGNMENT_KEY_NAME, entries.getValue()); + memberArray.add(memberData); + } + struct.set(GROUP_ASSIGNMENT_KEY_NAME, memberArray.toArray()); + + this.groupId = groupId; + this.generationId = generationId; + this.memberId = memberId; + this.groupAssignment = groupAssignment; + } + + public SyncGroupRequest(Struct struct) { + super(struct); + this.groupId = struct.getString(GROUP_ID_KEY_NAME); + this.generationId = struct.getInt(GENERATION_ID_KEY_NAME); + this.memberId = struct.getString(MEMBER_ID_KEY_NAME); + + groupAssignment = new HashMap<>(); + + for (Object memberDataObj : struct.getArray(GROUP_ASSIGNMENT_KEY_NAME)) { + Struct memberData = (Struct) memberDataObj; + String memberId = memberData.getString(MEMBER_ID_KEY_NAME); + ByteBuffer memberMetadata = memberData.getBytes(MEMBER_ASSIGNMENT_KEY_NAME); + groupAssignment.put(memberId, memberMetadata); + } + } + + @Override + public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { + switch (versionId) { + case 0: + return new SyncGroupResponse( + Errors.forException(e).code(), + ByteBuffer.wrap(new byte[]{})); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id))); + } + } + + public String groupId() { + return groupId; + } + + public int generationId() { + return generationId; + } + + public Map<String, ByteBuffer> groupAssignment() { + return groupAssignment; + } + + public String memberId() { + return memberId; + } + + public static SyncGroupRequest parse(ByteBuffer buffer, int versionId) { + return new SyncGroupRequest(ProtoUtils.parseRequest(ApiKeys.SYNC_GROUP.id, versionId, buffer)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java new file mode 100644 index 0000000..a96b7e5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; + +public class SyncGroupResponse extends AbstractRequestResponse { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.SYNC_GROUP.id); + public static final String ERROR_CODE_KEY_NAME = "error_code"; + public static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment"; + + /** + * Possible error codes: + * + * GROUP_COORDINATOR_NOT_AVAILABLE (15) + * NOT_COORDINATOR_FOR_GROUP (16) + * ILLEGAL_GENERATION (22) + * UNKNOWN_MEMBER_ID (25) + * REBALANCE_IN_PROGRESS (30) + * + */ + + private final short errorCode; + private final ByteBuffer memberState; + + public SyncGroupResponse(short errorCode, ByteBuffer memberState) { + super(new Struct(CURRENT_SCHEMA)); + + struct.set(ERROR_CODE_KEY_NAME, errorCode); + struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState); + + this.errorCode = errorCode; + this.memberState = memberState; + } + + public SyncGroupResponse(Struct struct) { + super(struct); + + this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + this.memberState = struct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME); + } + + public short errorCode() { + return errorCode; + } + + public ByteBuffer memberAssignment() { + return memberState; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index a7a2968..bc0e645 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -23,6 +23,7 @@ import java.io.StringWriter; import java.io.PrintWriter; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -52,6 +53,18 @@ public class Utils { private static final Logger log = LoggerFactory.getLogger(Utils.class); /** + * Get a sorted list representation of a collection. + * @param collection The collection to sort + * @param <T> The class of objects in the collection + * @return An unmodifiable sorted list with the contents of the collection + */ + public static <T extends Comparable<? super T>> List<T> sorted(Collection<T> collection) { + List<T> res = new ArrayList<>(collection); + Collections.sort(res); + return Collections.unmodifiableList(res); + } + + /** * Turn the given UTF8 byte array into a string * * @param bytes The byte array @@ -114,6 +127,21 @@ public class Utils { } /** + * Get the little-endian value of an integer as a byte array. + * @param val The value to convert to a litte-endian array + * @return The little-endian encoded array of bytes for the value + */ + public static byte[] toArrayLE(int val) { + return new byte[] { + (byte) (val >> 8 * 0), + (byte) (val >> 8 * 1), + (byte) (val >> 8 * 2), + (byte) (val >> 8 * 3) + }; + } + + + /** * Read an unsigned integer stored in little-endian format from a byte array * at a given offset. * http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index b7160a1..55d7608 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -195,7 +195,6 @@ public class MetadataTest { new HashSet<>(Arrays.asList("topic", "topic1")), topics); } - private Thread asyncFetch(final String topic) { Thread thread = new Thread() { public void run() { http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java new file mode 100644 index 0000000..13cce13 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.TopicPartition; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class RangeAssignorTest { + + private RangeAssignor assignor = new RangeAssignor(); + + + @Test + public void testOneConsumerNoTopic() { + String consumerId = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, + Collections.singletonMap(consumerId, Collections.<String>emptyList())); + + assertEquals(Collections.singleton(consumerId), assignment.keySet()); + assertTrue(assignment.get(consumerId).isEmpty()); + } + + @Test + public void testOneConsumerNonexistentTopic() { + String topic = "topic"; + String consumerId = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 0); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, + Collections.singletonMap(consumerId, Arrays.asList(topic))); + assertEquals(Collections.singleton(consumerId), assignment.keySet()); + assertTrue(assignment.get(consumerId).isEmpty()); + } + + @Test + public void testOneConsumerOneTopic() { + String topic = "topic"; + String consumerId = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, + Collections.singletonMap(consumerId, Arrays.asList(topic))); + + assertEquals(Collections.singleton(consumerId), assignment.keySet()); + assertAssignment(Arrays.asList( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(topic, 2)), assignment.get(consumerId)); + } + + @Test + public void testOnlyAssignsPartitionsFromSubscribedTopics() { + String topic = "topic"; + String otherTopic = "other"; + String consumerId = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + partitionsPerTopic.put(otherTopic, 3); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, + Collections.singletonMap(consumerId, Arrays.asList(topic))); + assertEquals(Collections.singleton(consumerId), assignment.keySet()); + assertAssignment(Arrays.asList( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(topic, 2)), assignment.get(consumerId)); + } + + @Test + public void testOneConsumerMultipleTopics() { + String topic1 = "topic1"; + String topic2 = "topic2"; + String consumerId = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic1, 1); + partitionsPerTopic.put(topic2, 2); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, + Collections.singletonMap(consumerId, Arrays.asList(topic1, topic2))); + + assertEquals(Collections.singleton(consumerId), assignment.keySet()); + assertAssignment(Arrays.asList( + new TopicPartition(topic1, 0), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1)), assignment.get(consumerId)); + } + + @Test + public void testTwoConsumersOneTopicOnePartition() { + String topic = "topic"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 1); + + Map<String, List<String>> consumers = new HashMap<>(); + consumers.put(consumer1, Arrays.asList(topic)); + consumers.put(consumer2, Arrays.asList(topic)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); + assertAssignment(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1)); + assertAssignment(Collections.<TopicPartition>emptyList(), assignment.get(consumer2)); + } + + + @Test + public void testTwoConsumersOneTopicTwoPartitions() { + String topic = "topic"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 2); + + Map<String, List<String>> consumers = new HashMap<>(); + consumers.put(consumer1, Arrays.asList(topic)); + consumers.put(consumer2, Arrays.asList(topic)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); + assertAssignment(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1)); + assertAssignment(Arrays.asList(new TopicPartition(topic, 1)), assignment.get(consumer2)); + } + + @Test + public void testMultipleConsumersMixedTopics() { + String topic1 = "topic1"; + String topic2 = "topic2"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + String consumer3 = "consumer3"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic1, 3); + partitionsPerTopic.put(topic2, 2); + + Map<String, List<String>> consumers = new HashMap<>(); + consumers.put(consumer1, Arrays.asList(topic1)); + consumers.put(consumer2, Arrays.asList(topic1, topic2)); + consumers.put(consumer3, Arrays.asList(topic1)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); + assertAssignment(Arrays.asList( + new TopicPartition(topic1, 0)), assignment.get(consumer1)); + assertAssignment(Arrays.asList( + new TopicPartition(topic1, 1), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1)), assignment.get(consumer2)); + assertAssignment(Arrays.asList( + new TopicPartition(topic1, 2)), assignment.get(consumer3)); + } + + @Test + public void testTwoConsumersTwoTopicsSixPartitions() { + String topic1 = "topic1"; + String topic2 = "topic2"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic1, 3); + partitionsPerTopic.put(topic2, 3); + + Map<String, List<String>> consumers = new HashMap<>(); + consumers.put(consumer1, Arrays.asList(topic1, topic2)); + consumers.put(consumer2, Arrays.asList(topic1, topic2)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); + assertAssignment(Arrays.asList( + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1)), assignment.get(consumer1)); + assertAssignment(Arrays.asList( + new TopicPartition(topic1, 2), + new TopicPartition(topic2, 2)), assignment.get(consumer2)); + } + + private void assertAssignment(List<TopicPartition> expected, List<TopicPartition> actual) { + // order doesn't matter for assignment, so convert to a set + assertEquals(new HashSet<>(expected), new HashSet<>(actual)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java new file mode 100644 index 0000000..31598cd --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.TopicPartition; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class RoundRobinAssignorTest { + + private RoundRobinAssignor assignor = new RoundRobinAssignor(); + + + @Test + public void testOneConsumerNoTopic() { + String consumerId = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, + Collections.singletonMap(consumerId, Collections.<String>emptyList())); + assertEquals(Collections.singleton(consumerId), assignment.keySet()); + assertTrue(assignment.get(consumerId).isEmpty()); + } + + @Test + public void testOneConsumerNonexistentTopic() { + String topic = "topic"; + String consumerId = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 0); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, + Collections.singletonMap(consumerId, Arrays.asList(topic))); + + assertEquals(Collections.singleton(consumerId), assignment.keySet()); + assertTrue(assignment.get(consumerId).isEmpty()); + } + + @Test + public void testOneConsumerOneTopic() { + String topic = "topic"; + String consumerId = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, + Collections.singletonMap(consumerId, Arrays.asList(topic))); + assertEquals(Arrays.asList( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(topic, 2)), assignment.get(consumerId)); + } + + @Test + public void testOnlyAssignsPartitionsFromSubscribedTopics() { + String topic = "topic"; + String otherTopic = "other"; + String consumerId = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 3); + partitionsPerTopic.put(otherTopic, 3); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, + Collections.singletonMap(consumerId, Arrays.asList(topic))); + assertEquals(Arrays.asList( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(topic, 2)), assignment.get(consumerId)); + } + + @Test + public void testOneConsumerMultipleTopics() { + String topic1 = "topic1"; + String topic2 = "topic2"; + String consumerId = "consumer"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic1, 1); + partitionsPerTopic.put(topic2, 2); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, + Collections.singletonMap(consumerId, Arrays.asList(topic1, topic2))); + assertEquals(Arrays.asList( + new TopicPartition(topic1, 0), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1)), assignment.get(consumerId)); + } + + @Test + public void testTwoConsumersOneTopicOnePartition() { + String topic = "topic"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 1); + + Map<String, List<String>> consumers = new HashMap<>(); + consumers.put(consumer1, Arrays.asList(topic)); + consumers.put(consumer2, Arrays.asList(topic)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); + assertEquals(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1)); + assertEquals(Collections.<TopicPartition>emptyList(), assignment.get(consumer2)); + } + + @Test + public void testTwoConsumersOneTopicTwoPartitions() { + String topic = "topic"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic, 2); + + Map<String, List<String>> consumers = new HashMap<>(); + consumers.put(consumer1, Arrays.asList(topic)); + consumers.put(consumer2, Arrays.asList(topic)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); + assertEquals(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1)); + assertEquals(Arrays.asList(new TopicPartition(topic, 1)), assignment.get(consumer2)); + } + + @Test + public void testMultipleConsumersMixedTopics() { + String topic1 = "topic1"; + String topic2 = "topic2"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + String consumer3 = "consumer3"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic1, 3); + partitionsPerTopic.put(topic2, 2); + + Map<String, List<String>> consumers = new HashMap<>(); + consumers.put(consumer1, Arrays.asList(topic1)); + consumers.put(consumer2, Arrays.asList(topic1, topic2)); + consumers.put(consumer3, Arrays.asList(topic1)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); + assertEquals(Arrays.asList( + new TopicPartition(topic1, 0)), assignment.get(consumer1)); + assertEquals(Arrays.asList( + new TopicPartition(topic1, 1), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1)), assignment.get(consumer2)); + assertEquals(Arrays.asList( + new TopicPartition(topic1, 2)), assignment.get(consumer3)); + } + + @Test + public void testTwoConsumersTwoTopicsSixPartitions() { + String topic1 = "topic1"; + String topic2 = "topic2"; + String consumer1 = "consumer1"; + String consumer2 = "consumer2"; + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + partitionsPerTopic.put(topic1, 3); + partitionsPerTopic.put(topic2, 3); + + Map<String, List<String>> consumers = new HashMap<>(); + consumers.put(consumer1, Arrays.asList(topic1, topic2)); + consumers.put(consumer2, Arrays.asList(topic1, topic2)); + + Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers); + assertEquals(Arrays.asList( + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 2), + new TopicPartition(topic2, 1)), assignment.get(consumer1)); + assertEquals(Arrays.asList( + new TopicPartition(topic1, 1), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 2)), assignment.get(consumer2)); + } + + public static List<String> topics(String... topics) { + return Arrays.asList(topics); + } + + public static TopicPartition tp(String topic, int partition) { + return new TopicPartition(topic, partition); + } + +}
