http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java new file mode 100644 index 0000000..0020993 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * ConsumerProtocol contains the schemas for consumer subscriptions and assignments for use with + * Kafka's generalized group management protocol. Below is the version 0 format: + * + * <pre> + * Subscription => Version Topics + * Version => Int16 + * Topics => [String] + * UserData => Bytes + * + * Assignment => Version TopicPartitions + * Version => int16 + * TopicPartitions => [Topic Partitions] + * Topic => String + * Partitions => [int32] + * </pre> + * + * The current implementation assumes that future versions will not break compatibility. When + * it encounters a newer version, it parses it using the current format. This basically means + * that new versions cannot remove or reorder any of the existing fields. + */ +public class ConsumerProtocol { + + public static final String VERSION_KEY_NAME = "version"; + public static final String TOPICS_KEY_NAME = "topics"; + public static final String TOPIC_KEY_NAME = "topic"; + public static final String PARTITIONS_KEY_NAME = "partitions"; + public static final String TOPIC_PARTITIONS_KEY_NAME = "topic_partitions"; + public static final String USER_DATA_KEY_NAME = "user_data"; + + public static final short CONSUMER_PROTOCOL_V0 = 0; + public static final Schema CONSUMER_PROTOCOL_HEADER_SCHEMA = new Schema( + new Field(VERSION_KEY_NAME, Type.INT16)); + private static final Struct CONSUMER_PROTOCOL_HEADER_V0 = new Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA) + .set(VERSION_KEY_NAME, CONSUMER_PROTOCOL_V0); + + public static final Schema SUBSCRIPTION_V0 = new Schema( + new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING)), + new Field(USER_DATA_KEY_NAME, Type.BYTES)); + public static final Schema TOPIC_ASSIGNMENT_V0 = new Schema( + new Field(TOPIC_KEY_NAME, Type.STRING), + new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32))); + public static final Schema ASSIGNMENT_V0 = new Schema( + new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)), + new Field(USER_DATA_KEY_NAME, Type.BYTES)); + + public static ByteBuffer serializeSubscription(PartitionAssignor.Subscription subscription) { + Struct struct = new Struct(SUBSCRIPTION_V0); + struct.set(USER_DATA_KEY_NAME, subscription.userData()); + struct.set(TOPICS_KEY_NAME, subscription.topics().toArray()); + ByteBuffer buffer = ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V0.sizeOf() + SUBSCRIPTION_V0.sizeOf(struct)); + CONSUMER_PROTOCOL_HEADER_V0.writeTo(buffer); + SUBSCRIPTION_V0.write(buffer, struct); + buffer.flip(); + return buffer; + } + + public static PartitionAssignor.Subscription deserializeSubscription(ByteBuffer buffer) { + Struct header = (Struct) CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer); + Short version = header.getShort(VERSION_KEY_NAME); + checkVersionCompatibility(version); + Struct struct = (Struct) SUBSCRIPTION_V0.read(buffer); + ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME); + List<String> topics = new ArrayList<>(); + for (Object topicObj : struct.getArray(TOPICS_KEY_NAME)) + topics.add((String) topicObj); + return new PartitionAssignor.Subscription(topics, userData); + } + + public static PartitionAssignor.Assignment deserializeAssignment(ByteBuffer buffer) { + Struct header = (Struct) CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer); + Short version = header.getShort(VERSION_KEY_NAME); + checkVersionCompatibility(version); + Struct struct = (Struct) ASSIGNMENT_V0.read(buffer); + ByteBuffer userData = struct.getBytes(USER_DATA_KEY_NAME); + List<TopicPartition> partitions = new ArrayList<>(); + for (Object structObj : struct.getArray(TOPIC_PARTITIONS_KEY_NAME)) { + Struct assignment = (Struct) structObj; + String topic = assignment.getString(TOPIC_KEY_NAME); + for (Object partitionObj : assignment.getArray(PARTITIONS_KEY_NAME)) { + Integer partition = (Integer) partitionObj; + partitions.add(new TopicPartition(topic, partition)); + } + } + return new PartitionAssignor.Assignment(partitions, userData); + } + + public static ByteBuffer serializeAssignment(PartitionAssignor.Assignment assignment) { + Struct struct = new Struct(ASSIGNMENT_V0); + struct.set(USER_DATA_KEY_NAME, assignment.userData()); + List<Struct> topicAssignments = new ArrayList<>(); + for (Map.Entry<String, List<Integer>> topicEntry : asMap(assignment.partitions()).entrySet()) { + Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT_V0); + topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey()); + topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); + topicAssignments.add(topicAssignment); + } + struct.set(TOPIC_PARTITIONS_KEY_NAME, topicAssignments.toArray()); + ByteBuffer buffer = ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V0.sizeOf() + ASSIGNMENT_V0.sizeOf(struct)); + CONSUMER_PROTOCOL_HEADER_V0.writeTo(buffer); + ASSIGNMENT_V0.write(buffer, struct); + buffer.flip(); + return buffer; + } + + private static void checkVersionCompatibility(short version) { + // check for invalid versions + if (version < CONSUMER_PROTOCOL_V0) + throw new SchemaException("Unsupported subscription version: " + version); + + // otherwise, assume versions can be parsed as V0 + } + + + private static Map<String, List<Integer>> asMap(Collection<TopicPartition> partitions) { + Map<String, List<Integer>> partitionMap = new HashMap<>(); + for (TopicPartition partition : partitions) { + String topic = partition.topic(); + List<Integer> topicPartitions = partitionMap.get(topic); + if (topicPartitions == null) { + topicPartitions = new ArrayList<>(); + partitionMap.put(topic, topicPartitions); + } + topicPartitions.add(partition.partition()); + } + return partitionMap; + } + +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java deleted file mode 100644 index 98193e8..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ /dev/null @@ -1,848 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals; - -import org.apache.kafka.clients.ClientResponse; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerWakeupException; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.DisconnectException; -import org.apache.kafka.common.errors.UnknownConsumerIdException; -import org.apache.kafka.common.metrics.Measurable; -import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Avg; -import org.apache.kafka.common.metrics.stats.Count; -import org.apache.kafka.common.metrics.stats.Max; -import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.ConsumerMetadataRequest; -import org.apache.kafka.common.requests.ConsumerMetadataResponse; -import org.apache.kafka.common.requests.HeartbeatRequest; -import org.apache.kafka.common.requests.HeartbeatResponse; -import org.apache.kafka.common.requests.JoinGroupRequest; -import org.apache.kafka.common.requests.JoinGroupResponse; -import org.apache.kafka.common.requests.OffsetCommitRequest; -import org.apache.kafka.common.requests.OffsetCommitResponse; -import org.apache.kafka.common.requests.OffsetFetchRequest; -import org.apache.kafka.common.requests.OffsetFetchResponse; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -/** - * This class manages the coordination process with the consumer coordinator. - */ -public final class Coordinator implements Closeable { - - private static final Logger log = LoggerFactory.getLogger(Coordinator.class); - - private final ConsumerNetworkClient client; - private final Time time; - private final String groupId; - private final Heartbeat heartbeat; - private final HeartbeatTask heartbeatTask; - private final int sessionTimeoutMs; - private final String assignmentStrategy; - private final SubscriptionState subscriptions; - private final CoordinatorMetrics sensors; - private final long requestTimeoutMs; - private final long retryBackoffMs; - private final OffsetCommitCallback defaultOffsetCommitCallback; - private final boolean autoCommitEnabled; - - private Node consumerCoordinator; - private String consumerId; - private int generation; - - /** - * Initialize the coordination manager. - */ - public Coordinator(ConsumerNetworkClient client, - String groupId, - int sessionTimeoutMs, - int heartbeatIntervalMs, - String assignmentStrategy, - SubscriptionState subscriptions, - Metrics metrics, - String metricGrpPrefix, - Map<String, String> metricTags, - Time time, - long requestTimeoutMs, - long retryBackoffMs, - OffsetCommitCallback defaultOffsetCommitCallback, - boolean autoCommitEnabled, - long autoCommitIntervalMs) { - this.client = client; - this.time = time; - this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID; - this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID; - this.groupId = groupId; - this.consumerCoordinator = null; - this.subscriptions = subscriptions; - this.sessionTimeoutMs = sessionTimeoutMs; - this.assignmentStrategy = assignmentStrategy; - this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds()); - this.heartbeatTask = new HeartbeatTask(); - this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags); - this.requestTimeoutMs = requestTimeoutMs; - this.retryBackoffMs = retryBackoffMs; - this.defaultOffsetCommitCallback = defaultOffsetCommitCallback; - this.autoCommitEnabled = autoCommitEnabled; - - if (autoCommitEnabled) - scheduleAutoCommitTask(autoCommitIntervalMs); - } - - /** - * Refresh the committed offsets for provided partitions. - */ - public void refreshCommittedOffsetsIfNeeded() { - if (subscriptions.refreshCommitsNeeded()) { - Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions()); - for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { - TopicPartition tp = entry.getKey(); - // verify assignment is still active - if (subscriptions.isAssigned(tp)) - this.subscriptions.committed(tp, entry.getValue()); - } - this.subscriptions.commitsRefreshed(); - } - } - - /** - * Fetch the current committed offsets from the coordinator for a set of partitions. - * @param partitions The partitions to fetch offsets for - * @return A map from partition to the committed offset - */ - public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) { - while (true) { - ensureCoordinatorKnown(); - - // contact coordinator to fetch committed offsets - RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions); - client.poll(future); - - if (future.succeeded()) - return future.value(); - - if (!future.isRetriable()) - throw future.exception(); - - Utils.sleep(retryBackoffMs); - } - } - - /** - * Ensure that we have a valid partition assignment from the coordinator. - */ - public void ensurePartitionAssignment() { - if (!subscriptions.partitionAssignmentNeeded()) - return; - - // commit offsets prior to rebalance if auto-commit enabled - maybeAutoCommitOffsetsSync(); - - ConsumerRebalanceListener listener = subscriptions.listener(); - - // execute the user's listener before rebalance - log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions()); - try { - Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions()); - listener.onPartitionsRevoked(revoked); - } catch (Exception e) { - log.error("User provided listener " + listener.getClass().getName() - + " failed on partition revocation: ", e); - } - - reassignPartitions(); - - // execute the user's listener after rebalance - log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions()); - try { - Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions()); - listener.onPartitionsAssigned(assigned); - } catch (Exception e) { - log.error("User provided listener " + listener.getClass().getName() - + " failed on partition assignment: ", e); - } - } - - private void reassignPartitions() { - while (subscriptions.partitionAssignmentNeeded()) { - ensureCoordinatorKnown(); - - // ensure that there are no pending requests to the coordinator. This is important - // in particular to avoid resending a pending JoinGroup request. - if (client.pendingRequestCount(this.consumerCoordinator) > 0) { - client.awaitPendingRequests(this.consumerCoordinator); - continue; - } - - RequestFuture<Void> future = sendJoinGroupRequest(); - client.poll(future); - - if (future.failed()) { - if (future.exception() instanceof UnknownConsumerIdException) - continue; - else if (!future.isRetriable()) - throw future.exception(); - Utils.sleep(retryBackoffMs); - } - } - } - - /** - * Block until the coordinator for this group is known. - */ - public void ensureCoordinatorKnown() { - while (coordinatorUnknown()) { - RequestFuture<Void> future = sendConsumerMetadataRequest(); - client.poll(future, requestTimeoutMs); - - if (future.failed()) - client.awaitMetadataUpdate(); - } - } - - - @Override - public void close() { - // commit offsets prior to closing if auto-commit enabled - while (true) { - try { - maybeAutoCommitOffsetsSync(); - return; - } catch (ConsumerWakeupException e) { - // ignore wakeups while closing to ensure we have a chance to commit - continue; - } - } - } - - private class HeartbeatTask implements DelayedTask { - - public void reset() { - // start or restart the heartbeat task to be executed at the next chance - long now = time.milliseconds(); - heartbeat.resetSessionTimeout(now); - client.unschedule(this); - client.schedule(this, now); - } - - @Override - public void run(final long now) { - if (!subscriptions.partitionsAutoAssigned() || - subscriptions.partitionAssignmentNeeded() || - coordinatorUnknown()) - // no need to send if we're not using auto-assignment or if we are - // awaiting a rebalance - return; - - if (heartbeat.sessionTimeoutExpired(now)) { - // we haven't received a successful heartbeat in one session interval - // so mark the coordinator dead - coordinatorDead(); - return; - } - - if (!heartbeat.shouldHeartbeat(now)) { - // we don't need to heartbeat now, so reschedule for when we do - client.schedule(this, now + heartbeat.timeToNextHeartbeat(now)); - } else { - heartbeat.sentHeartbeat(now); - RequestFuture<Void> future = sendHeartbeatRequest(); - future.addListener(new RequestFutureListener<Void>() { - @Override - public void onSuccess(Void value) { - long now = time.milliseconds(); - heartbeat.receiveHeartbeat(now); - long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now); - client.schedule(HeartbeatTask.this, nextHeartbeatTime); - } - - @Override - public void onFailure(RuntimeException e) { - client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs); - } - }); - } - } - } - - /** - * Send a request to get a new partition assignment. This is a non-blocking call which sends - * a JoinGroup request to the coordinator (if it is available). The returned future must - * be polled to see if the request completed successfully. - * @return A request future whose completion indicates the result of the JoinGroup request. - */ - private RequestFuture<Void> sendJoinGroupRequest() { - if (coordinatorUnknown()) - return RequestFuture.coordinatorNotAvailable(); - - // send a join group request to the coordinator - List<String> subscribedTopics = new ArrayList<String>(subscriptions.subscription()); - log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics); - - JoinGroupRequest request = new JoinGroupRequest(groupId, - this.sessionTimeoutMs, - subscribedTopics, - this.consumerId, - this.assignmentStrategy); - - // create the request for the coordinator - log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.consumerCoordinator.id()); - return client.send(consumerCoordinator, ApiKeys.JOIN_GROUP, request) - .compose(new JoinGroupResponseHandler()); - } - - private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, Void> { - - @Override - public JoinGroupResponse parse(ClientResponse response) { - return new JoinGroupResponse(response.responseBody()); - } - - @Override - public void handle(JoinGroupResponse joinResponse, RequestFuture<Void> future) { - // process the response - short errorCode = joinResponse.errorCode(); - - if (errorCode == Errors.NONE.code()) { - Coordinator.this.consumerId = joinResponse.consumerId(); - Coordinator.this.generation = joinResponse.generationId(); - - // set the flag to refresh last committed offsets - subscriptions.needRefreshCommits(); - - log.debug("Joined group: {}", joinResponse.toStruct()); - - // record re-assignment time - sensors.partitionReassignments.record(response.requestLatencyMs()); - - // update partition assignment - subscriptions.changePartitionAssignment(joinResponse.assignedPartitions()); - heartbeatTask.reset(); - future.complete(null); - } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) { - // reset the consumer id and retry immediately - Coordinator.this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID; - log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.", - groupId); - future.raise(Errors.UNKNOWN_CONSUMER_ID); - } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() - || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { - // re-discover the coordinator and retry with backoff - coordinatorDead(); - log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.", - groupId); - future.raise(Errors.forCode(errorCode)); - } else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code() - || errorCode == Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code() - || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) { - // log the error and re-throw the exception - Errors error = Errors.forCode(errorCode); - log.error("Attempt to join group {} failed due to: {}", - groupId, error.exception().getMessage()); - future.raise(error); - } else { - // unexpected error, throw the exception - future.raise(new KafkaException("Unexpected error in join group response: " - + Errors.forCode(joinResponse.errorCode()).exception().getMessage())); - } - } - } - - public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { - this.subscriptions.needRefreshCommits(); - RequestFuture<Void> future = sendOffsetCommitRequest(offsets); - final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; - future.addListener(new RequestFutureListener<Void>() { - @Override - public void onSuccess(Void value) { - cb.onComplete(offsets, null); - } - - @Override - public void onFailure(RuntimeException e) { - cb.onComplete(offsets, e); - } - }); - } - - public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) { - if (offsets.isEmpty()) - return; - - while (true) { - ensureCoordinatorKnown(); - - RequestFuture<Void> future = sendOffsetCommitRequest(offsets); - client.poll(future); - - if (future.succeeded()) { - return; - } - - if (!future.isRetriable()) { - throw future.exception(); - } - - Utils.sleep(retryBackoffMs); - } - } - - private void scheduleAutoCommitTask(final long interval) { - DelayedTask task = new DelayedTask() { - public void run(long now) { - commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() { - @Override - public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { - if (exception != null) - log.error("Auto offset commit failed.", exception); - } - }); - client.schedule(this, now + interval); - } - }; - client.schedule(task, time.milliseconds() + interval); - } - - private void maybeAutoCommitOffsetsSync() { - if (autoCommitEnabled) { - try { - commitOffsetsSync(subscriptions.allConsumed()); - } catch (ConsumerWakeupException e) { - // rethrow wakeups since they are triggered by the user - throw e; - } catch (Exception e) { - // consistent with async auto-commit failures, we do not propagate the exception - log.error("Auto offset commit failed.", e); - } - } - } - - /** - * Reset the generation/consumerId tracked by this consumer. - */ - public void resetGeneration() { - this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID; - this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID; - } - - /** - * Commit offsets for the specified list of topics and partitions. This is a non-blocking call - * which returns a request future that can be polled in the case of a synchronous commit or ignored in the - * asynchronous case. - * - * @param offsets The list of offsets per partition that should be committed. - * @return A request future whose value indicates whether the commit was successful or not - */ - private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) { - if (coordinatorUnknown()) - return RequestFuture.coordinatorNotAvailable(); - - if (offsets.isEmpty()) - return RequestFuture.voidSuccess(); - - // create the offset commit request - Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size()); - for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { - OffsetAndMetadata offsetAndMetadata = entry.getValue(); - offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData( - offsetAndMetadata.offset(), offsetAndMetadata.metadata())); - } - - OffsetCommitRequest req = new OffsetCommitRequest(this.groupId, - this.generation, - this.consumerId, - OffsetCommitRequest.DEFAULT_RETENTION_TIME, - offsetData); - - return client.send(consumerCoordinator, ApiKeys.OFFSET_COMMIT, req) - .compose(new OffsetCommitResponseHandler(offsets)); - } - - public static class DefaultOffsetCommitCallback implements OffsetCommitCallback { - @Override - public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { - if (exception != null) - log.error("Offset commit failed.", exception); - } - } - - private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> { - - private final Map<TopicPartition, OffsetAndMetadata> offsets; - - public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) { - this.offsets = offsets; - } - - @Override - public OffsetCommitResponse parse(ClientResponse response) { - return new OffsetCommitResponse(response.responseBody()); - } - - @Override - public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) { - sensors.commitLatency.record(response.requestLatencyMs()); - for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) { - TopicPartition tp = entry.getKey(); - OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp); - long offset = offsetAndMetadata.offset(); - - short errorCode = entry.getValue(); - if (errorCode == Errors.NONE.code()) { - log.debug("Committed offset {} for partition {}", offset, tp); - if (subscriptions.isAssigned(tp)) - // update the local cache only if the partition is still assigned - subscriptions.committed(tp, offsetAndMetadata); - } else { - if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() - || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { - coordinatorDead(); - } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code() - || errorCode == Errors.ILLEGAL_GENERATION.code()) { - // need to re-join group - subscriptions.needReassignment(); - } - - log.error("Error committing partition {} at offset {}: {}", - tp, - offset, - Errors.forCode(errorCode).exception().getMessage()); - - future.raise(Errors.forCode(errorCode)); - return; - } - } - - future.complete(null); - } - } - - /** - * Fetch the committed offsets for a set of partitions. This is a non-blocking call. The - * returned future can be polled to get the actual offsets returned from the broker. - * - * @param partitions The set of partitions to get offsets for. - * @return A request future containing the committed offsets. - */ - private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) { - if (coordinatorUnknown()) - return RequestFuture.coordinatorNotAvailable(); - - log.debug("Fetching committed offsets for partitions: {}", Utils.join(partitions, ", ")); - // construct the request - OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions)); - - // send the request with a callback - return client.send(consumerCoordinator, ApiKeys.OFFSET_FETCH, request) - .compose(new OffsetFetchResponseHandler()); - } - - private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> { - - @Override - public OffsetFetchResponse parse(ClientResponse response) { - return new OffsetFetchResponse(response.responseBody()); - } - - @Override - public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) { - Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size()); - for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) { - TopicPartition tp = entry.getKey(); - OffsetFetchResponse.PartitionData data = entry.getValue(); - if (data.hasError()) { - log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode) - .exception() - .getMessage()); - if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) { - // just retry - future.raise(Errors.OFFSET_LOAD_IN_PROGRESS); - } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { - // re-discover the coordinator and retry - coordinatorDead(); - future.raise(Errors.NOT_COORDINATOR_FOR_CONSUMER); - } else if (data.errorCode == Errors.UNKNOWN_CONSUMER_ID.code() - || data.errorCode == Errors.ILLEGAL_GENERATION.code()) { - // need to re-join group - subscriptions.needReassignment(); - future.raise(Errors.forCode(data.errorCode)); - } else { - future.raise(new KafkaException("Unexpected error in fetch offset response: " - + Errors.forCode(data.errorCode).exception().getMessage())); - } - return; - } else if (data.offset >= 0) { - // record the position with the offset (-1 indicates no committed offset to fetch) - offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata)); - } else { - log.debug("No committed offset for partition " + tp); - } - } - - future.complete(offsets); - } - } - - /** - * Send a heartbeat request now (visible only for testing). - */ - public RequestFuture<Void> sendHeartbeatRequest() { - HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId); - return client.send(consumerCoordinator, ApiKeys.HEARTBEAT, req) - .compose(new HeartbeatCompletionHandler()); - } - - public boolean coordinatorUnknown() { - return this.consumerCoordinator == null; - } - - /** - * Discover the current coordinator for the consumer group. Sends a ConsumerMetadata request to - * one of the brokers. The returned future should be polled to get the result of the request. - * @return A request future which indicates the completion of the metadata request - */ - private RequestFuture<Void> sendConsumerMetadataRequest() { - // initiate the consumer metadata request - // find a node to ask about the coordinator - Node node = this.client.leastLoadedNode(); - if (node == null) { - // TODO: If there are no brokers left, perhaps we should use the bootstrap set - // from configuration? - return RequestFuture.noBrokersAvailable(); - } else { - // create a consumer metadata request - log.debug("Issuing consumer metadata request to broker {}", node.id()); - ConsumerMetadataRequest metadataRequest = new ConsumerMetadataRequest(this.groupId); - return client.send(node, ApiKeys.CONSUMER_METADATA, metadataRequest) - .compose(new RequestFutureAdapter<ClientResponse, Void>() { - @Override - public void onSuccess(ClientResponse response, RequestFuture<Void> future) { - handleConsumerMetadataResponse(response, future); - } - }); - } - } - - private void handleConsumerMetadataResponse(ClientResponse resp, RequestFuture<Void> future) { - log.debug("Consumer metadata response {}", resp); - - // parse the response to get the coordinator info if it is not disconnected, - // otherwise we need to request metadata update - if (resp.wasDisconnected()) { - future.raise(new DisconnectException()); - } else if (!coordinatorUnknown()) { - // We already found the coordinator, so ignore the request - future.complete(null); - } else { - ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(resp.responseBody()); - // use MAX_VALUE - node.id as the coordinator id to mimic separate connections - // for the coordinator in the underlying network client layer - // TODO: this needs to be better handled in KAFKA-1935 - if (consumerMetadataResponse.errorCode() == Errors.NONE.code()) { - this.consumerCoordinator = new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(), - consumerMetadataResponse.node().host(), - consumerMetadataResponse.node().port()); - heartbeatTask.reset(); - future.complete(null); - } else { - future.raise(Errors.forCode(consumerMetadataResponse.errorCode())); - } - } - } - - /** - * Mark the current coordinator as dead. - */ - private void coordinatorDead() { - if (this.consumerCoordinator != null) { - log.info("Marking the coordinator {} dead.", this.consumerCoordinator.id()); - this.consumerCoordinator = null; - } - } - - private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> { - @Override - public HeartbeatResponse parse(ClientResponse response) { - return new HeartbeatResponse(response.responseBody()); - } - - @Override - public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) { - sensors.heartbeatLatency.record(response.requestLatencyMs()); - short error = heartbeatResponse.errorCode(); - if (error == Errors.NONE.code()) { - log.debug("Received successful heartbeat response."); - future.complete(null); - } else if (error == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() - || error == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { - log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead."); - coordinatorDead(); - future.raise(Errors.forCode(error)); - } else if (error == Errors.REBALANCE_IN_PROGRESS.code()) { - log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group."); - subscriptions.needReassignment(); - future.raise(Errors.REBALANCE_IN_PROGRESS); - } else if (error == Errors.ILLEGAL_GENERATION.code()) { - log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group."); - subscriptions.needReassignment(); - future.raise(Errors.ILLEGAL_GENERATION); - } else if (error == Errors.UNKNOWN_CONSUMER_ID.code()) { - log.info("Attempt to heart beat failed since consumer id is not valid, reset it and try to re-join group."); - consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID; - subscriptions.needReassignment(); - future.raise(Errors.UNKNOWN_CONSUMER_ID); - } else { - future.raise(new KafkaException("Unexpected error in heartbeat response: " - + Errors.forCode(error).exception().getMessage())); - } - } - } - - private abstract class CoordinatorResponseHandler<R, T> - extends RequestFutureAdapter<ClientResponse, T> { - protected ClientResponse response; - - public abstract R parse(ClientResponse response); - - public abstract void handle(R response, RequestFuture<T> future); - - @Override - public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) { - this.response = clientResponse; - - if (clientResponse.wasDisconnected()) { - int correlation = response.request().request().header().correlationId(); - log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected", - response.request(), - correlation, - response.request().request().destination()); - - // mark the coordinator as dead - coordinatorDead(); - future.raise(new DisconnectException()); - return; - } - - R response = parse(clientResponse); - handle(response, future); - } - - @Override - public void onFailure(RuntimeException e, RequestFuture<T> future) { - if (e instanceof DisconnectException) { - log.debug("Coordinator request failed", e); - coordinatorDead(); - } - future.raise(e); - } - } - - - private class CoordinatorMetrics { - public final Metrics metrics; - public final String metricGrpName; - - public final Sensor commitLatency; - public final Sensor heartbeatLatency; - public final Sensor partitionReassignments; - - public CoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) { - this.metrics = metrics; - this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; - - this.commitLatency = metrics.sensor("commit-latency"); - this.commitLatency.add(new MetricName("commit-latency-avg", - this.metricGrpName, - "The average time taken for a commit request", - tags), new Avg()); - this.commitLatency.add(new MetricName("commit-latency-max", - this.metricGrpName, - "The max time taken for a commit request", - tags), new Max()); - this.commitLatency.add(new MetricName("commit-rate", - this.metricGrpName, - "The number of commit calls per second", - tags), new Rate(new Count())); - - this.heartbeatLatency = metrics.sensor("heartbeat-latency"); - this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max", - this.metricGrpName, - "The max time taken to receive a response to a hearbeat request", - tags), new Max()); - this.heartbeatLatency.add(new MetricName("heartbeat-rate", - this.metricGrpName, - "The average number of heartbeats per second", - tags), new Rate(new Count())); - - this.partitionReassignments = metrics.sensor("reassignment-latency"); - this.partitionReassignments.add(new MetricName("reassignment-time-avg", - this.metricGrpName, - "The average time taken for a partition reassignment", - tags), new Avg()); - this.partitionReassignments.add(new MetricName("reassignment-time-max", - this.metricGrpName, - "The max time taken for a partition reassignment", - tags), new Avg()); - this.partitionReassignments.add(new MetricName("reassignment-rate", - this.metricGrpName, - "The number of partition reassignments per second", - tags), new Rate(new Count())); - - Measurable numParts = - new Measurable() { - public double measure(MetricConfig config, long now) { - return subscriptions.assignedPartitions().size(); - } - }; - metrics.addMetric(new MetricName("assigned-partitions", - this.metricGrpName, - "The number of partitions currently assigned to this consumer", - tags), - numParts); - - Measurable lastHeartbeat = - new Measurable() { - public double measure(MetricConfig config, long now) { - return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS); - } - }; - metrics.addMetric(new MetricName("last-heartbeat-seconds-ago", - this.metricGrpName, - "The number of seconds since the last controller heartbeat", - tags), - lastHeartbeat); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 7e55d46..f119552 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -167,20 +167,31 @@ public class Fetcher<K, V> { } } - + /** + * Get topic metadata for all topics in the cluster + * @param timeout time for which getting topic metadata is attempted + * @return The map of topics with their partition information + */ + public Map<String, List<PartitionInfo>> getAllTopicMetadata(long timeout) { + return getTopicMetadata(null, timeout); + } /** * Get metadata for all topics present in Kafka cluster * - * @param timeout time for which getting all topics is attempted - * @return The map of topics and its partitions + * @param topics The list of topics to fetch or null to fetch all + * @param timeout time for which getting topic metadata is attempted + * @return The map of topics with their partition information */ - public Map<String, List<PartitionInfo>> getAllTopics(long timeout) { + public Map<String, List<PartitionInfo>> getTopicMetadata(List<String> topics, long timeout) { + if (topics != null && topics.isEmpty()) + return Collections.emptyMap(); + final HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<>(); long startTime = time.milliseconds(); while (time.milliseconds() - startTime < timeout) { - RequestFuture<ClientResponse> requestFuture = sendMetadataRequest(); + RequestFuture<ClientResponse> requestFuture = sendMetadataRequest(topics); if (requestFuture != null) { client.poll(requestFuture); @@ -209,11 +220,12 @@ public class Fetcher<K, V> { * Send Metadata Request to least loaded node in Kafka cluster asynchronously * @return A future that indicates result of sent metadata request */ - public RequestFuture<ClientResponse> sendMetadataRequest() { + public RequestFuture<ClientResponse> sendMetadataRequest(List<String> topics) { + if (topics == null) + topics = Collections.emptyList(); final Node node = client.leastLoadedNode(); return node == null ? null : - client.send( - node, ApiKeys.METADATA, new MetadataRequest(Collections.<String>emptyList())); + client.send(node, ApiKeys.METADATA, new MetadataRequest(topics)); } /** @@ -448,8 +460,9 @@ public class Fetcher<K, V> { long fetched = this.subscriptions.fetched(partition); long consumed = this.subscriptions.consumed(partition); // Only fetch data for partitions whose previously fetched data has been consumed - if (consumed == fetched) + if (consumed == fetched) { fetch.put(partition, new FetchRequest.PartitionData(fetched, this.fetchSize)); + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java new file mode 100644 index 0000000..46bfa75 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.TopicPartition; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * This interface is used to define custom partition assignment for use in + * {@link org.apache.kafka.clients.consumer.KafkaConsumer}. Members of the consumer group subscribe + * to the topics they are interested in and forward their subscriptions to a Kafka broker serving + * as the group coordinator. The coordinator selects one member to perform the group assignment and + * propagates the subscriptions of all members to it. Then {@link #assign(Cluster, Map)} is called + * to perform the assignment and the results are forwarded back to each respective members + * + * In some cases, it is useful to forward additional metadata to the assignor in order to make + * assignment decisions. For this, you can override {@link #subscription(Set)} and provide custom + * userData in the returned Subscription. For example, to have a rack-aware assignor, an implementation + * can use this user data to forward the rackId belonging to each member. + */ +public interface PartitionAssignor { + + /** + * Return a serializable object representing the local member's subscription. This can include + * additional information as well (e.g. local host/rack information) which can be leveraged in + * {@link #assign(Cluster, Map)}. + * @param topics Topics subscribed to through {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(List)} + * and variants + * @return Non-null subscription with optional user data + */ + Subscription subscription(Set<String> topics); + + /** + * Perform the group assignment given the member subscriptions and current cluster metadata. + * @param metadata Current topic/broker metadata known by consumer + * @param subscriptions Subscriptions from all members provided through {@link #subscription(Set)} + * @return A map from the members to their respective assignment. This should have one entry + * for all members who in the input subscription map. + */ + Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions); + + + /** + * Callback which is invoked when a group member receives its assignment from the leader. + * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, Map)} + */ + void onAssignment(Assignment assignment); + + + /** + * Unique name for this assignor (e.g. "range" or "roundrobin") + * @return non-null unique name + */ + String name(); + + class Subscription { + private final List<String> topics; + private final ByteBuffer userData; + + public Subscription(List<String> topics, ByteBuffer userData) { + this.topics = topics; + this.userData = userData; + } + + public Subscription(List<String> topics) { + this(topics, ByteBuffer.wrap(new byte[0])); + } + + public List<String> topics() { + return topics; + } + + public ByteBuffer userData() { + return userData; + } + + } + + class Assignment { + private final List<TopicPartition> partitions; + private final ByteBuffer userData; + + public Assignment(List<TopicPartition> partitions, ByteBuffer userData) { + this.partitions = partitions; + this.userData = userData; + } + + public Assignment(List<TopicPartition> partitions) { + this(partitions, ByteBuffer.wrap(new byte[0])); + } + + public List<TopicPartition> partitions() { + return partitions; + } + + public ByteBuffer userData() { + return userData; + } + + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java index f5c1afc..7be99bd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java @@ -175,6 +175,20 @@ public class RequestFuture<T> { return adapted; } + public void chain(final RequestFuture<T> future) { + addListener(new RequestFutureListener<T>() { + @Override + public void onSuccess(T value) { + future.complete(value); + } + + @Override + public void onFailure(RuntimeException e) { + future.raise(e); + } + }); + } + public static <T> RequestFuture<T> failure(RuntimeException e) { RequestFuture<T> future = new RequestFuture<T>(); future.raise(e); @@ -188,7 +202,7 @@ public class RequestFuture<T> { } public static <T> RequestFuture<T> coordinatorNotAvailable() { - return failure(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception()); + return failure(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception()); } public static <T> RequestFuture<T> leaderNotAvailable() { http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 25a0e90..6e79a7f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -29,7 +29,7 @@ import java.util.regex.Pattern; /** * A class for tracking the topics, partitions, and offsets for the consumer. A partition * is "assigned" either directly with {@link #assign(List)} (manual assignment) - * or with {@link #changePartitionAssignment(List)} (automatic assignment). + * or with {@link #changePartitionAssignment(Collection)} (automatic assignment). * * Once assigned, the partition is not considered "fetchable" until its initial position has * been set with {@link #seek(TopicPartition, long)}. Fetchable partitions track a fetch @@ -54,6 +54,9 @@ public class SubscriptionState { /* the list of topics the user has requested */ private final Set<String> subscription; + /* the list of topics the group has subscribed to (set only for the leader on join group completion) */ + private final Set<String> groupSubscription; + /* the list of partitions the user has requested */ private final Set<TopicPartition> userAssignment; @@ -80,6 +83,7 @@ public class SubscriptionState { this.subscription = new HashSet<>(); this.userAssignment = new HashSet<>(); this.assignment = new HashMap<>(); + this.groupSubscription = new HashSet<>(); this.needsPartitionAssignment = false; this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up this.subscribedPattern = null; @@ -101,6 +105,7 @@ public class SubscriptionState { if (!this.subscription.equals(new HashSet<>(topicsToSubscribe))) { this.subscription.clear(); this.subscription.addAll(topicsToSubscribe); + this.groupSubscription.addAll(topicsToSubscribe); this.needsPartitionAssignment = true; // Remove any assigned partitions which are no longer subscribed to @@ -110,10 +115,22 @@ public class SubscriptionState { it.remove(); } } + } + /** + * Add topics to the current group subscription. This is used by the group leader to ensure + * that it receives metadata updates for all topics that the group is interested in. + * @param topics The topics to add to the group subscription + */ + public void groupSubscribe(Collection<String> topics) { + if (!this.userAssignment.isEmpty()) + throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE); + this.groupSubscription.addAll(topics); } public void needReassignment() { + // + this.groupSubscription.retainAll(subscription); this.needsPartitionAssignment = true; } @@ -142,6 +159,10 @@ public class SubscriptionState { this.subscribedPattern = pattern; } + public boolean hasPatternSubscription() { + return subscribedPattern != null; + } + public void unsubscribe() { this.subscription.clear(); this.assignment.clear(); @@ -154,15 +175,24 @@ public class SubscriptionState { return this.subscribedPattern; } - public void clearAssignment() { - this.assignment.clear(); - this.needsPartitionAssignment = !subscription().isEmpty(); - } - public Set<String> subscription() { return this.subscription; } + /** + * Get the subscription for the group. For the leader, this will include the union of the + * subscriptions of all group members. For followers, it is just that member's subscription. + * This is used when querying topic metadata to detect the metadata changes which would + * require rebalancing. The leader fetches metadata for all topics in the group so that it + * can do the partition assignment (which requires at least partition counts for all topics + * to be assigned). + * @return The union of all subscribed topics in the group if this member is the leader + * of the current generation; otherwise it returns the same set as {@link #subscription()} + */ + public Set<String> groupSubscription() { + return this.groupSubscription; + } + public Long fetched(TopicPartition tp) { return assignedState(tp).fetched; } @@ -280,7 +310,7 @@ public class SubscriptionState { for (TopicPartition tp : assignments) if (!this.subscription.contains(tp.topic())) throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic."); - this.clearAssignment(); + this.assignment.clear(); for (TopicPartition tp: assignments) addAssignedPartition(tp); this.needsPartitionAssignment = false; http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/Cluster.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index 60594a7..e6a2e43 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -173,6 +173,16 @@ public final class Cluster { } /** + * Get the number of partitions for the given topic + * @param topic The topic to get the number of partitions for + * @return The number of partitions or null if there is no corresponding metadata + */ + public Integer partitionCountForTopic(String topic) { + List<PartitionInfo> partitionInfos = this.partitionsByTopic.get(topic); + return partitionInfos == null ? null : partitionInfos.size(); + } + + /** * Get all topics. * @return a set of all topics */ http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java deleted file mode 100644 index ba9ce82..0000000 --- a/clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package org.apache.kafka.common.errors; - -/** - * The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has - * not yet been created. - */ -public class ConsumerCoordinatorNotAvailableException extends RetriableException { - - private static final long serialVersionUID = 1L; - - public ConsumerCoordinatorNotAvailableException() { - super(); - } - - public ConsumerCoordinatorNotAvailableException(String message) { - super(message); - } - - public ConsumerCoordinatorNotAvailableException(String message, Throwable cause) { - super(message, cause); - } - - public ConsumerCoordinatorNotAvailableException(Throwable cause) { - super(cause); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java new file mode 100644 index 0000000..c0949e3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.errors; + +/** + * The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has + * not yet been created. + */ +public class GroupCoordinatorNotAvailableException extends RetriableException { + + private static final long serialVersionUID = 1L; + + public GroupCoordinatorNotAvailableException() { + super(); + } + + public GroupCoordinatorNotAvailableException(String message) { + super(message); + } + + public GroupCoordinatorNotAvailableException(String message, Throwable cause) { + super(message, cause); + } + + public GroupCoordinatorNotAvailableException(Throwable cause) { + super(cause); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java deleted file mode 100644 index b6c83b4..0000000 --- a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package org.apache.kafka.common.errors; - -/** - * The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is - * not a coordinator for. - */ -public class NotCoordinatorForConsumerException extends RetriableException { - - private static final long serialVersionUID = 1L; - - public NotCoordinatorForConsumerException() { - super(); - } - - public NotCoordinatorForConsumerException(String message) { - super(message); - } - - public NotCoordinatorForConsumerException(String message, Throwable cause) { - super(message, cause); - } - - public NotCoordinatorForConsumerException(Throwable cause) { - super(cause); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java new file mode 100644 index 0000000..bc56eb0 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.errors; + +/** + * The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is + * not a coordinator for. + */ +public class NotCoordinatorForGroupException extends RetriableException { + + private static final long serialVersionUID = 1L; + + public NotCoordinatorForGroupException() { + super(); + } + + public NotCoordinatorForGroupException(String message) { + super(message); + } + + public NotCoordinatorForGroupException(String message, Throwable cause) { + super(message, cause); + } + + public NotCoordinatorForGroupException(Throwable cause) { + super(cause); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java deleted file mode 100644 index 28bfd72..0000000 --- a/clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.common.errors; - -public class UnknownConsumerIdException extends ApiException { - private static final long serialVersionUID = 1L; - - public UnknownConsumerIdException() { - super(); - } - - public UnknownConsumerIdException(String message, Throwable cause) { - super(message, cause); - } - - public UnknownConsumerIdException(String message) { - super(message); - } - - public UnknownConsumerIdException(Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/errors/UnknownMemberIdException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownMemberIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnknownMemberIdException.java new file mode 100644 index 0000000..f8eab90 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownMemberIdException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class UnknownMemberIdException extends ApiException { + private static final long serialVersionUID = 1L; + + public UnknownMemberIdException() { + super(); + } + + public UnknownMemberIdException(String message, Throwable cause) { + super(message, cause); + } + + public UnknownMemberIdException(String message) { + super(message); + } + + public UnknownMemberIdException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index fab8b02..af7b266 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -30,10 +30,11 @@ public enum ApiKeys { CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"), OFFSET_COMMIT(8, "OffsetCommit"), OFFSET_FETCH(9, "OffsetFetch"), - CONSUMER_METADATA(10, "ConsumerMetadata"), + GROUP_METADATA(10, "GroupMetadata"), JOIN_GROUP(11, "JoinGroup"), HEARTBEAT(12, "Heartbeat"), - LEAVE_GROUP(13, "LeaveGroup"); + LEAVE_GROUP(13, "LeaveGroup"), + SYNC_GROUP(14, "SyncGroup"); private static ApiKeys[] codeToType; public static final int MAX_API_KEY; http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 220132f..3191636 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -60,10 +60,10 @@ public enum Errors { new NetworkException("The server disconnected before a response was received.")), OFFSET_LOAD_IN_PROGRESS(14, new OffsetLoadInProgressException("The coordinator is loading offsets and can't process requests.")), - CONSUMER_COORDINATOR_NOT_AVAILABLE(15, - new ConsumerCoordinatorNotAvailableException("The coordinator is not available.")), - NOT_COORDINATOR_FOR_CONSUMER(16, - new NotCoordinatorForConsumerException("This is not the correct coordinator for this consumer.")), + GROUP_COORDINATOR_NOT_AVAILABLE(15, + new GroupCoordinatorNotAvailableException("The group coordinator is not available.")), + NOT_COORDINATOR_FOR_GROUP(16, + new NotCoordinatorForGroupException("This is not the correct coordinator for this group.")), INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")), RECORD_LIST_TOO_LARGE(18, @@ -75,17 +75,13 @@ public enum Errors { INVALID_REQUIRED_ACKS(21, new InvalidRequiredAcksException("Produce request specified an invalid value for required acks.")), ILLEGAL_GENERATION(22, - new IllegalGenerationException("Specified consumer generation id is not valid.")), - INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY(23, - new ApiException("The request partition assignment strategy does not match that of the group.")), - UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY(24, - new ApiException("The request partition assignment strategy is unknown to the broker.")), - UNKNOWN_CONSUMER_ID(25, - new UnknownConsumerIdException("The coordinator is not aware of this consumer.")), + new IllegalGenerationException("Specified group generation id is not valid.")), + INCONSISTENT_GROUP_PROTOCOL(23, + new ApiException("The group member's supported protocols are incompatible with those of existing members.")), + UNKNOWN_MEMBER_ID(25, + new UnknownMemberIdException("The coordinator is not aware of this member.")), INVALID_SESSION_TIMEOUT(26, new ApiException("The session timeout is not within an acceptable range.")), - COMMITTING_PARTITIONS_NOT_ASSIGNED(27, - new ApiException("Some of the committing partitions are not assigned the committer")), INVALID_COMMIT_OFFSET_SIZE(28, new ApiException("The committing offset data size is not valid")), AUTHORIZATION_FAILED(29, new ApiException("Request is not authorized.")),
