KAFKA-2464: client-side assignment for new consumer Author: Jason Gustafson <[email protected]>
Reviewers: Jiangjie Qin, Onur Karaman, Ewen Cheslack-Postava, Guozhang Wang Closes #165 from hachikuji/KAFKA-2464 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86eb74d9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86eb74d9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86eb74d9 Branch: refs/heads/trunk Commit: 86eb74d9236c586af5889fe79f4b9e066c9c2af3 Parents: 6e747d4 Author: Jason Gustafson <[email protected]> Authored: Wed Oct 21 12:13:42 2015 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Oct 21 12:13:42 2015 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/ConsumerConfig.java | 9 +- .../kafka/clients/consumer/KafkaConsumer.java | 47 +- .../kafka/clients/consumer/RangeAssignor.java | 97 ++ .../clients/consumer/RoundRobinAssignor.java | 114 +++ .../consumer/internals/AbstractCoordinator.java | 638 +++++++++++++ .../internals/AbstractPartitionAssignor.java | 90 ++ .../consumer/internals/ConsumerCoordinator.java | 595 ++++++++++++ .../internals/ConsumerNetworkClient.java | 9 + .../consumer/internals/ConsumerProtocol.java | 162 ++++ .../clients/consumer/internals/Coordinator.java | 848 ----------------- .../clients/consumer/internals/Fetcher.java | 31 +- .../consumer/internals/PartitionAssignor.java | 117 +++ .../consumer/internals/RequestFuture.java | 16 +- .../consumer/internals/SubscriptionState.java | 44 +- .../java/org/apache/kafka/common/Cluster.java | 10 + ...onsumerCoordinatorNotAvailableException.java | 40 - .../GroupCoordinatorNotAvailableException.java | 40 + .../NotCoordinatorForConsumerException.java | 40 - .../errors/NotCoordinatorForGroupException.java | 40 + .../errors/UnknownConsumerIdException.java | 33 - .../common/errors/UnknownMemberIdException.java | 33 + .../apache/kafka/common/protocol/ApiKeys.java | 5 +- .../apache/kafka/common/protocol/Errors.java | 22 +- .../apache/kafka/common/protocol/Protocol.java | 97 +- .../kafka/common/requests/AbstractRequest.java | 6 +- .../requests/ConsumerMetadataRequest.java | 65 -- .../requests/ConsumerMetadataResponse.java | 70 -- .../common/requests/GroupMetadataRequest.java | 65 ++ .../common/requests/GroupMetadataResponse.java | 70 ++ .../kafka/common/requests/HeartbeatRequest.java | 16 +- .../common/requests/HeartbeatResponse.java | 6 +- .../kafka/common/requests/JoinGroupRequest.java | 96 +- .../common/requests/JoinGroupResponse.java | 110 ++- .../common/requests/OffsetCommitRequest.java | 34 +- .../common/requests/OffsetCommitResponse.java | 6 +- .../common/requests/OffsetFetchResponse.java | 4 +- .../kafka/common/requests/SyncGroupRequest.java | 118 +++ .../common/requests/SyncGroupResponse.java | 71 ++ .../org/apache/kafka/common/utils/Utils.java | 28 + .../org/apache/kafka/clients/MetadataTest.java | 1 - .../clients/consumer/RangeAssignorTest.java | 217 +++++ .../consumer/RoundRobinAssignorTest.java | 209 +++++ .../internals/ConsumerCoordinatorTest.java | 749 +++++++++++++++ .../internals/ConsumerNetworkClientTest.java | 2 +- .../internals/ConsumerProtocolTest.java | 118 +++ .../consumer/internals/CoordinatorTest.java | 635 ------------- .../clients/consumer/internals/FetcherTest.java | 4 +- .../internals/MockPartitionAssignor.java | 49 + .../common/requests/RequestResponseTest.java | 16 +- .../kafka/copycat/util/KafkaBasedLogTest.java | 2 +- .../src/main/scala/kafka/admin/AclCommand.scala | 22 +- .../main/scala/kafka/admin/TopicCommand.scala | 7 +- .../kafka/api/ConsumerMetadataRequest.scala | 80 -- .../kafka/api/ConsumerMetadataResponse.scala | 58 -- .../scala/kafka/api/GroupMetadataRequest.scala | 80 ++ .../scala/kafka/api/GroupMetadataResponse.scala | 58 ++ .../scala/kafka/api/OffsetCommitRequest.scala | 16 +- core/src/main/scala/kafka/api/RequestKeys.scala | 5 +- .../main/scala/kafka/client/ClientUtils.scala | 4 +- .../kafka/common/OffsetMetadataAndError.scala | 5 +- core/src/main/scala/kafka/common/Topic.scala | 4 +- .../scala/kafka/consumer/SimpleConsumer.scala | 4 +- .../kafka/coordinator/ConsumerCoordinator.scala | 535 ----------- .../coordinator/ConsumerGroupMetadata.scala | 133 --- .../kafka/coordinator/ConsumerMetadata.scala | 50 - .../kafka/coordinator/CoordinatorMetadata.scala | 160 +--- .../kafka/coordinator/DelayedHeartbeat.scala | 12 +- .../scala/kafka/coordinator/DelayedJoin.scala | 40 + .../kafka/coordinator/DelayedRebalance.scala | 40 - .../kafka/coordinator/GroupCoordinator.scala | 632 +++++++++++++ .../scala/kafka/coordinator/GroupMetadata.scala | 209 +++++ .../kafka/coordinator/MemberMetadata.scala | 99 ++ .../kafka/coordinator/PartitionAssignor.scala | 125 --- .../javaapi/ConsumerMetadataResponse.scala | 47 - .../kafka/javaapi/GroupMetadataResponse.scala | 47 + .../kafka/security/auth/ResourceType.scala | 6 +- .../src/main/scala/kafka/server/KafkaApis.scala | 144 +-- .../main/scala/kafka/server/KafkaConfig.scala | 16 +- .../main/scala/kafka/server/KafkaServer.scala | 6 +- .../main/scala/kafka/server/OffsetManager.scala | 16 +- .../kafka/api/BaseConsumerTest.scala | 25 +- .../kafka/api/ConsumerBounceTest.scala | 10 +- .../kafka/api/IntegrationTestHarness.scala | 12 +- .../integration/kafka/api/QuotasTest.scala | 1 - .../integration/kafka/api/SslConsumerTest.scala | 22 - .../scala/other/kafka/TestOffsetManager.scala | 4 +- .../scala/unit/kafka/admin/AclCommandTest.scala | 8 +- .../unit/kafka/admin/TopicCommandTest.scala | 8 +- .../api/RequestResponseSerializationTest.scala | 10 +- .../unit/kafka/consumer/TopicFilterTest.scala | 10 +- .../ConsumerCoordinatorResponseTest.scala | 447 --------- .../coordinator/ConsumerGroupMetadataTest.scala | 172 ---- .../coordinator/CoordinatorMetadataTest.scala | 160 +--- .../GroupCoordinatorResponseTest.scala | 907 +++++++++++++++++++ .../kafka/coordinator/GroupMetadataTest.scala | 249 +++++ .../kafka/coordinator/MemberMetadataTest.scala | 90 ++ .../coordinator/PartitionAssignorTest.scala | 305 ------- .../security/auth/SimpleAclAuthorizerTest.scala | 4 +- .../unit/kafka/server/KafkaConfigTest.scala | 4 +- .../unit/kafka/server/OffsetCommitTest.scala | 4 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 6 +- 101 files changed, 6634 insertions(+), 4428 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 1894822..5cc0419 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -17,9 +17,9 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.config.SSLConfigs; import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.Deserializer; import java.util.HashMap; import java.util.Map; @@ -78,7 +78,7 @@ public class ConsumerConfig extends AbstractConfig { * <code>partition.assignment.strategy</code> */ public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy"; - private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The friendly name of the partition assignment strategy that the server will use to distribute partition ownership amongst consumer instances when group management is used"; + private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used"; /** * <code>auto.offset.reset</code> @@ -182,9 +182,8 @@ public class ConsumerConfig extends AbstractConfig { Importance.HIGH, HEARTBEAT_INTERVAL_MS_DOC) .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, - Type.STRING, - "range", - in("range", "roundrobin"), + Type.LIST, + RangeAssignor.class.getName(), Importance.MEDIUM, PARTITION_ASSIGNMENT_STRATEGY_DOC) .define(METADATA_MAX_AGE_CONFIG, http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 2f7f153..cd166f0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -16,9 +16,10 @@ import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; -import org.apache.kafka.clients.consumer.internals.Coordinator; +import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; import org.apache.kafka.clients.consumer.internals.Fetcher; import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; @@ -43,7 +44,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -403,7 +403,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { private static final String JMX_PREFIX = "kafka.consumer"; private String clientId; - private final Coordinator coordinator; + private final ConsumerCoordinator coordinator; private final Deserializer<K> keyDeserializer; private final Deserializer<V> valueDeserializer; private final Fetcher<K, V> fetcher; @@ -416,7 +416,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { private final long retryBackoffMs; private long requestTimeoutMs; private boolean closed = false; - private Metadata.Listener metadataListener; // currentThread holds the threadId of the current thread accessing KafkaConsumer // and is used to prevent multi-threaded access @@ -531,11 +530,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs); OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase()); this.subscriptions = new SubscriptionState(offsetResetStrategy); - this.coordinator = new Coordinator(this.client, + List<PartitionAssignor> assignors = config.getConfiguredInstances( + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, + PartitionAssignor.class); + this.coordinator = new ConsumerCoordinator(this.client, config.getString(ConsumerConfig.GROUP_ID_CONFIG), config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), - config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), + assignors, + this.metadata, this.subscriptions, metrics, metricGrpPrefix, @@ -543,7 +546,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { this.time, requestTimeoutMs, retryBackoffMs, - new Coordinator.DefaultOffsetCommitCallback(), + new ConsumerCoordinator.DefaultOffsetCommitCallback(), config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); if (keyDeserializer == null) { @@ -652,7 +655,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { try { log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", ")); this.subscriptions.subscribe(topics, listener); - metadata.setTopics(topics); + metadata.setTopics(subscriptions.groupSubscription()); } finally { release(); } @@ -699,22 +702,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { acquire(); try { log.debug("Subscribed to pattern: {}", pattern); - metadataListener = new Metadata.Listener() { - @Override - public void onMetadataUpdate(Cluster cluster) { - final List<String> topicsToSubscribe = new ArrayList<>(); - - for (String topic : cluster.topics()) - if (subscriptions.getSubscribedPattern().matcher(topic).matches()) - topicsToSubscribe.add(topic); - - subscriptions.changeSubscription(topicsToSubscribe); - metadata.setTopics(topicsToSubscribe); - } - }; this.subscriptions.subscribe(pattern, listener); this.metadata.needMetadataForAllTopics(true); - this.metadata.addListener(metadataListener); } finally { release(); } @@ -729,7 +718,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { this.subscriptions.unsubscribe(); this.coordinator.resetGeneration(); this.metadata.needMetadataForAllTopics(false); - this.metadata.removeListener(metadataListener); } finally { release(); } @@ -1079,12 +1067,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { try { Cluster cluster = this.metadata.fetch(); List<PartitionInfo> parts = cluster.partitionsForTopic(topic); - if (parts == null) { - metadata.add(topic); - client.awaitMetadataUpdate(); - parts = metadata.fetch().partitionsForTopic(topic); - } - return parts; + if (parts != null) + return parts; + + Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topic), requestTimeoutMs); + return topicMetadata.get(topic); } finally { release(); } @@ -1101,7 +1088,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { public Map<String, List<PartitionInfo>> listTopics() { acquire(); try { - return fetcher.getAllTopics(requestTimeoutMs); + return fetcher.getAllTopicMetadata(requestTimeoutMs); } finally { release(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java new file mode 100644 index 0000000..f23151c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; +import org.apache.kafka.common.TopicPartition; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order + * and the consumers in lexicographic order. We then divide the number of partitions by the total number of + * consumers to determine the number of partitions to assign to each consumer. If it does not evenly + * divide, then the first few consumers will have one extra partition. + * + * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, + * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. + * + * The assignment will be: + * C0: [t0p0, t0p1, t1p0, t1p1] + * C1: [t0p2, t1p2] + */ +public class RangeAssignor extends AbstractPartitionAssignor { + + @Override + public String name() { + return "range"; + } + + private List<TopicPartition> partitions(String topic, + int numPartitions) { + List<TopicPartition> partitions = new ArrayList<>(); + for (int i = 0; i < numPartitions; i++) + partitions.add(new TopicPartition(topic, i)); + return partitions; + } + + private Map<String, List<String>> consumersPerTopic(Map<String, List<String>> consumerMetadata) { + Map<String, List<String>> res = new HashMap<>(); + for (Map.Entry<String, List<String>> subscriptionEntry : consumerMetadata.entrySet()) { + String consumerId = subscriptionEntry.getKey(); + for (String topic : subscriptionEntry.getValue()) + put(res, topic, consumerId); + } + return res; + } + + @Override + public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, + Map<String, List<String>> subscriptions) { + Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); + Map<String, List<TopicPartition>> assignment = new HashMap<>(); + for (String memberId : subscriptions.keySet()) + assignment.put(memberId, new ArrayList<TopicPartition>()); + + for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { + String topic = topicEntry.getKey(); + List<String> consumersForTopic = topicEntry.getValue(); + + Integer numPartitionsForTopic = partitionsPerTopic.get(topic); + if (numPartitionsForTopic == null) + continue; + + Collections.sort(consumersForTopic); + + int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); + int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); + + List<TopicPartition> partitions = partitions(topic, numPartitionsForTopic); + for (int i = 0, n = consumersForTopic.size(); i < n; i++) { + int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); + int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); + assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); + } + } + return assignment; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java new file mode 100644 index 0000000..c5ea2bb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * The roundrobin assignor lays out all the available partitions and all the available consumers. It + * then proceeds to do a roundrobin assignment from partition to consumer. If the subscriptions of all consumer + * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts + * will be within a delta of exactly one across all consumers.) + * + * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, + * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. + * + * The assignment will be: + * C0: [t0p0, t0p2, t1p1] + * C1: [t0p1, t1p0, t1p2] + */ +public class RoundRobinAssignor extends AbstractPartitionAssignor { + + @Override + public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, + Map<String, List<String>> subscriptions) { + Map<String, List<TopicPartition>> assignment = new HashMap<>(); + for (String memberId : subscriptions.keySet()) + assignment.put(memberId, new ArrayList<TopicPartition>()); + + CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet())); + for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) { + final String topic = partition.topic(); + while (!subscriptions.get(assigner.peek()).contains(topic)) + assigner.next(); + assignment.get(assigner.next()).add(partition); + } + return assignment; + } + + + public List<TopicPartition> allPartitionsSorted(Map<String, Integer> partitionsPerTopic, + Map<String, List<String>> subscriptions) { + SortedSet<String> topics = new TreeSet<>(); + for (List<String> subscription : subscriptions.values()) + topics.addAll(subscription); + + List<TopicPartition> allPartitions = new ArrayList<>(); + for (String topic : topics) { + Integer partitions = partitionsPerTopic.get(topic); + for (int partition = 0; partition < partitions; partition++) { + allPartitions.add(new TopicPartition(topic, partition)); + } + } + return allPartitions; + } + + @Override + public String name() { + return "roundrobin"; + } + + private static class CircularIterator<T> implements Iterator<T> { + int i = 0; + private List<T> list; + + public CircularIterator(List<T> list) { + if (list.isEmpty()) { + throw new IllegalArgumentException("CircularIterator can only be used on non-empty lists"); + } + this.list = list; + } + + @Override + public boolean hasNext() { + return true; + } + + @Override + public T next() { + T next = list.get(i); + i = (i + 1) % list.size(); + return next; + } + + public T peek() { + return list.get(i); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java new file mode 100644 index 0000000..1ffd2bb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -0,0 +1,638 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Count; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.requests.GroupMetadataRequest; +import org.apache.kafka.common.requests.GroupMetadataResponse; +import org.apache.kafka.common.requests.HeartbeatRequest; +import org.apache.kafka.common.requests.HeartbeatResponse; +import org.apache.kafka.common.requests.JoinGroupRequest; +import org.apache.kafka.common.requests.JoinGroupResponse; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.SyncGroupRequest; +import org.apache.kafka.common.requests.SyncGroupResponse; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * AbstractCoordinator implements group management for a single group member by interacting with + * a designated Kafka broker (the coordinator). Group semantics are provided by extending this class. + * See {@link ConsumerCoordinator} for example usage. + * + * From a high level, Kafka's group management protocol consists of the following sequence of actions: + * + * <ol> + * <li>Group Registration: Group members register with the coordinator providing their own metadata + * (such as the set of topics they are interested in).</li> + * <li>Group/Leader Selection: The coordinator select the members of the group and chooses one member + * as the leader.</li> + * <li>State Assignment: The leader collects the metadata from all the members of the group and + * assigns state.</li> + * <li>Group Stabilization: Each member receives the state assigned by the leader and begins + * processing.</li> + * </ol> + * + * To leverage this protocol, an implementation must define the format of metadata provided by each + * member for group registration in {@link #metadata()} and the format of the state assignment provided + * by the leader in {@link #doSync(String, String, Map)} and becomes available to members in + * {@link #onJoin(int, String, String, ByteBuffer)}. + * + */ +public abstract class AbstractCoordinator { + + private static final Logger log = LoggerFactory.getLogger(AbstractCoordinator.class); + + private final Heartbeat heartbeat; + private final HeartbeatTask heartbeatTask; + private final int sessionTimeoutMs; + private final GroupCoordinatorMetrics sensors; + protected final String groupId; + protected final ConsumerNetworkClient client; + protected final Time time; + protected final long retryBackoffMs; + protected final long requestTimeoutMs; + + private boolean rejoinNeeded = true; + protected Node coordinator; + protected String memberId; + protected String protocol; + protected int generation; + + /** + * Initialize the coordination manager. + */ + public AbstractCoordinator(ConsumerNetworkClient client, + String groupId, + int sessionTimeoutMs, + int heartbeatIntervalMs, + Metrics metrics, + String metricGrpPrefix, + Map<String, String> metricTags, + Time time, + long requestTimeoutMs, + long retryBackoffMs) { + this.client = client; + this.time = time; + this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID; + this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; + this.groupId = groupId; + this.coordinator = null; + this.sessionTimeoutMs = sessionTimeoutMs; + this.heartbeat = new Heartbeat(this.sessionTimeoutMs, heartbeatIntervalMs, time.milliseconds()); + this.heartbeatTask = new HeartbeatTask(); + this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix, metricTags); + this.requestTimeoutMs = requestTimeoutMs; + this.retryBackoffMs = retryBackoffMs; + } + + /** + * Unique identifier for the class of protocols implements (e.g. "consumer" or "copycat"). + * @return Non-null protocol type namej + */ + protected abstract String protocolType(); + + /** + * Get the current list of protocols and their associated metadata supported + * by the local member. The order of the protocols in the map indicates the preference + * of the protocol (the first entry is the most preferred). The coordinator takes this + * preference into account when selecting the generation protocol (generally more preferred + * protocols will be selected as long as all members support them and there is no disagreement + * on the preference). + * @return Non-empty map of supported protocols and metadata + */ + protected abstract LinkedHashMap<String, ByteBuffer> metadata(); + + /** + * Invoked when a group member has successfully joined a group. + * @param generation The generation that was joined + * @param memberId The identifier for the local member in the group + * @param protocol The protocol selected by the coordinator + * @param memberAssignment The assignment propagated from the group leader + */ + protected abstract void onJoin(int generation, + String memberId, + String protocol, + ByteBuffer memberAssignment); + + /** + * Perform synchronization for the group. This is used by the leader to push state to all the members + * of the group (e.g. to push partition assignments in the case of the new consumer) + * @param leaderId The id of the leader (which is this member) + * @param allMemberMetadata Metadata from all members of the group + * @return A map from each member to their state assignment + */ + protected abstract Map<String, ByteBuffer> doSync(String leaderId, + String protocol, + Map<String, ByteBuffer> allMemberMetadata); + + /** + * Invoked when the group is left (whether because of shutdown, metadata change, stale generation, etc.) + * @param generation The generation that was left + * @param memberId The identifier of the local member in the group + */ + protected abstract void onLeave(int generation, String memberId); + + + /** + * Block until the coordinator for this group is known. + */ + public void ensureCoordinatorKnown() { + while (coordinatorUnknown()) { + RequestFuture<Void> future = sendGroupMetadataRequest(); + client.poll(future, requestTimeoutMs); + + if (future.failed()) + client.awaitMetadataUpdate(); + } + } + + /** + * Check whether the group should be rejoined (e.g. if metadata changes) + * @return true if it should, false otherwise + */ + protected boolean needRejoin() { + return rejoinNeeded; + } + + /** + * Reset the generation/memberId tracked by this member + */ + public void resetGeneration() { + this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID; + this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; + rejoinNeeded = true; + } + + /** + * Ensure that the group is active (i.e. joined and synced) + */ + public void ensureActiveGroup() { + if (!needRejoin()) + return; + + // onLeave only invoked if we have a valid current generation + onLeave(generation, memberId); + + while (needRejoin()) { + ensureCoordinatorKnown(); + + // ensure that there are no pending requests to the coordinator. This is important + // in particular to avoid resending a pending JoinGroup request. + if (client.pendingRequestCount(this.coordinator) > 0) { + client.awaitPendingRequests(this.coordinator); + continue; + } + + RequestFuture<ByteBuffer> future = sendJoinGroupRequest(); + client.poll(future); + + if (future.succeeded()) { + onJoin(generation, memberId, protocol, future.value()); + heartbeatTask.reset(); + } else { + if (future.exception() instanceof UnknownMemberIdException) + continue; + else if (!future.isRetriable()) + throw future.exception(); + Utils.sleep(retryBackoffMs); + } + } + } + + private class HeartbeatTask implements DelayedTask { + + public void reset() { + // start or restart the heartbeat task to be executed at the next chance + long now = time.milliseconds(); + heartbeat.resetSessionTimeout(now); + client.unschedule(this); + client.schedule(this, now); + } + + @Override + public void run(final long now) { + if (generation < 0 || needRejoin() || coordinatorUnknown()) { + // no need to send the heartbeat we're not using auto-assignment or if we are + // awaiting a rebalance + return; + } + + if (heartbeat.sessionTimeoutExpired(now)) { + // we haven't received a successful heartbeat in one session interval + // so mark the coordinator dead + coordinatorDead(); + return; + } + + if (!heartbeat.shouldHeartbeat(now)) { + // we don't need to heartbeat now, so reschedule for when we do + client.schedule(this, now + heartbeat.timeToNextHeartbeat(now)); + } else { + heartbeat.sentHeartbeat(now); + RequestFuture<Void> future = sendHeartbeatRequest(); + future.addListener(new RequestFutureListener<Void>() { + @Override + public void onSuccess(Void value) { + long now = time.milliseconds(); + heartbeat.receiveHeartbeat(now); + long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now); + client.schedule(HeartbeatTask.this, nextHeartbeatTime); + } + + @Override + public void onFailure(RuntimeException e) { + client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs); + } + }); + } + } + } + + /** + * Send a request to get a new partition assignment. This is a non-blocking call which sends + * a JoinGroup request to the coordinator (if it is available). The returned future must + * be polled to see if the request completed successfully. + * @return A request future whose completion indicates the result of the JoinGroup request. + */ + private RequestFuture<ByteBuffer> sendJoinGroupRequest() { + if (coordinatorUnknown()) + return RequestFuture.coordinatorNotAvailable(); + + // send a join group request to the coordinator + log.debug("(Re-)joining group {}", groupId); + + List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>(); + for (Map.Entry<String, ByteBuffer> metadataEntry : metadata().entrySet()) + protocols.add(new JoinGroupRequest.GroupProtocol(metadataEntry.getKey(), metadataEntry.getValue())); + + JoinGroupRequest request = new JoinGroupRequest( + groupId, + this.sessionTimeoutMs, + this.memberId, + protocolType(), + protocols); + + // create the request for the coordinator + log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.coordinator.id()); + return client.send(coordinator, ApiKeys.JOIN_GROUP, request) + .compose(new JoinGroupResponseHandler()); + } + + + private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> { + + @Override + public JoinGroupResponse parse(ClientResponse response) { + return new JoinGroupResponse(response.responseBody()); + } + + @Override + public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) { + // process the response + short errorCode = joinResponse.errorCode(); + if (errorCode == Errors.NONE.code()) { + log.debug("Joined group: {}", joinResponse.toStruct()); + AbstractCoordinator.this.memberId = joinResponse.memberId(); + AbstractCoordinator.this.generation = joinResponse.generationId(); + AbstractCoordinator.this.rejoinNeeded = false; + AbstractCoordinator.this.protocol = joinResponse.groupProtocol(); + sensors.joinLatency.record(response.requestLatencyMs()); + performSync(joinResponse).chain(future); + } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) { + // reset the member id and retry immediately + AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; + log.info("Attempt to join group {} failed due to unknown member id, resetting and retrying.", + groupId); + future.raise(Errors.UNKNOWN_MEMBER_ID); + } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() + || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { + // re-discover the coordinator and retry with backoff + coordinatorDead(); + log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.", + groupId); + future.raise(Errors.forCode(errorCode)); + } else if (errorCode == Errors.INCONSISTENT_GROUP_PROTOCOL.code() + || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) { + // log the error and re-throw the exception + Errors error = Errors.forCode(errorCode); + log.error("Attempt to join group {} failed due to: {}", + groupId, error.exception().getMessage()); + future.raise(error); + } else { + // unexpected error, throw the exception + future.raise(new KafkaException("Unexpected error in join group response: " + + Errors.forCode(joinResponse.errorCode()).exception().getMessage())); + } + } + } + + private RequestFuture<ByteBuffer> performSync(JoinGroupResponse joinResponse) { + if (joinResponse.isLeader()) { + try { + // perform the leader synchronization and send back the assignment for the group + Map<String, ByteBuffer> groupAssignment = doSync(joinResponse.leaderId(), joinResponse.groupProtocol(), + joinResponse.members()); + + SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment); + log.debug("Issuing leader SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id()); + return sendSyncGroupRequest(request); + } catch (RuntimeException e) { + return RequestFuture.failure(e); + } + } else { + // send follower's sync group with an empty assignment + SyncGroupRequest request = new SyncGroupRequest(groupId, generation, + memberId, Collections.<String, ByteBuffer>emptyMap()); + log.debug("Issuing follower SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id()); + return sendSyncGroupRequest(request); + } + } + + private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest request) { + if (coordinatorUnknown()) + return RequestFuture.coordinatorNotAvailable(); + return client.send(coordinator, ApiKeys.SYNC_GROUP, request) + .compose(new SyncGroupRequestHandler()); + } + + private class SyncGroupRequestHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> { + + @Override + public SyncGroupResponse parse(ClientResponse response) { + return new SyncGroupResponse(response.responseBody()); + } + + @Override + public void handle(SyncGroupResponse syncResponse, + RequestFuture<ByteBuffer> future) { + short errorCode = syncResponse.errorCode(); + if (errorCode == Errors.NONE.code()) { + try { + future.complete(syncResponse.memberAssignment()); + sensors.syncLatency.record(response.requestLatencyMs()); + } catch (SchemaException e) { + future.raise(e); + } + } else { + AbstractCoordinator.this.rejoinNeeded = true; + future.raise(Errors.forCode(errorCode)); + } + } + } + + /** + * Discover the current coordinator for the group. Sends a GroupMetadata request to + * one of the brokers. The returned future should be polled to get the result of the request. + * @return A request future which indicates the completion of the metadata request + */ + private RequestFuture<Void> sendGroupMetadataRequest() { + // initiate the group metadata request + // find a node to ask about the coordinator + Node node = this.client.leastLoadedNode(); + if (node == null) { + // TODO: If there are no brokers left, perhaps we should use the bootstrap set + // from configuration? + return RequestFuture.noBrokersAvailable(); + } else { + // create a group metadata request + log.debug("Issuing group metadata request to broker {}", node.id()); + GroupMetadataRequest metadataRequest = new GroupMetadataRequest(this.groupId); + return client.send(node, ApiKeys.GROUP_METADATA, metadataRequest) + .compose(new RequestFutureAdapter<ClientResponse, Void>() { + @Override + public void onSuccess(ClientResponse response, RequestFuture<Void> future) { + handleGroupMetadataResponse(response, future); + } + }); + } + } + + private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) { + log.debug("Group metadata response {}", resp); + + // parse the response to get the coordinator info if it is not disconnected, + // otherwise we need to request metadata update + if (resp.wasDisconnected()) { + future.raise(new DisconnectException()); + } else if (!coordinatorUnknown()) { + // We already found the coordinator, so ignore the request + future.complete(null); + } else { + GroupMetadataResponse groupMetadataResponse = new GroupMetadataResponse(resp.responseBody()); + // use MAX_VALUE - node.id as the coordinator id to mimic separate connections + // for the coordinator in the underlying network client layer + // TODO: this needs to be better handled in KAFKA-1935 + if (groupMetadataResponse.errorCode() == Errors.NONE.code()) { + this.coordinator = new Node(Integer.MAX_VALUE - groupMetadataResponse.node().id(), + groupMetadataResponse.node().host(), + groupMetadataResponse.node().port()); + + // start sending heartbeats only if we have a valid generation + if (generation > 0) + heartbeatTask.reset(); + future.complete(null); + } else { + future.raise(Errors.forCode(groupMetadataResponse.errorCode())); + } + } + } + + /** + * Check if we know who the coordinator is. + * @return true if the coordinator is unknown + */ + public boolean coordinatorUnknown() { + return this.coordinator == null; + } + + + /** + * Mark the current coordinator as dead. + */ + protected void coordinatorDead() { + if (this.coordinator != null) { + log.info("Marking the coordinator {} dead.", this.coordinator.id()); + this.coordinator = null; + } + } + + /** + * Send a heartbeat request now (visible only for testing). + */ + public RequestFuture<Void> sendHeartbeatRequest() { + HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.memberId); + return client.send(coordinator, ApiKeys.HEARTBEAT, req) + .compose(new HeartbeatCompletionHandler()); + } + + private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> { + @Override + public HeartbeatResponse parse(ClientResponse response) { + return new HeartbeatResponse(response.responseBody()); + } + + @Override + public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) { + sensors.heartbeatLatency.record(response.requestLatencyMs()); + short error = heartbeatResponse.errorCode(); + if (error == Errors.NONE.code()) { + log.debug("Received successful heartbeat response."); + future.complete(null); + } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() + || error == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { + log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead."); + coordinatorDead(); + future.raise(Errors.forCode(error)); + } else if (error == Errors.REBALANCE_IN_PROGRESS.code()) { + log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group."); + AbstractCoordinator.this.rejoinNeeded = true; + future.raise(Errors.REBALANCE_IN_PROGRESS); + } else if (error == Errors.ILLEGAL_GENERATION.code()) { + log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group."); + AbstractCoordinator.this.rejoinNeeded = true; + future.raise(Errors.ILLEGAL_GENERATION); + } else if (error == Errors.UNKNOWN_MEMBER_ID.code()) { + log.info("Attempt to heart beat failed since member id is not valid, reset it and try to re-join group."); + memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; + AbstractCoordinator.this.rejoinNeeded = true; + future.raise(Errors.UNKNOWN_MEMBER_ID); + } else { + future.raise(new KafkaException("Unexpected error in heartbeat response: " + + Errors.forCode(error).exception().getMessage())); + } + } + } + + protected abstract class CoordinatorResponseHandler<R, T> + extends RequestFutureAdapter<ClientResponse, T> { + protected ClientResponse response; + + public abstract R parse(ClientResponse response); + + public abstract void handle(R response, RequestFuture<T> future); + + @Override + public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) { + this.response = clientResponse; + + if (clientResponse.wasDisconnected()) { + int correlation = response.request().request().header().correlationId(); + log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected", + response.request(), + correlation, + response.request().request().destination()); + + // mark the coordinator as dead + coordinatorDead(); + future.raise(new DisconnectException()); + return; + } + + R response = parse(clientResponse); + handle(response, future); + } + + } + + private class GroupCoordinatorMetrics { + public final Metrics metrics; + public final String metricGrpName; + + public final Sensor heartbeatLatency; + public final Sensor joinLatency; + public final Sensor syncLatency; + + public GroupCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) { + this.metrics = metrics; + this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; + + this.heartbeatLatency = metrics.sensor("heartbeat-latency"); + this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max", + this.metricGrpName, + "The max time taken to receive a response to a heartbeat request", + tags), new Max()); + this.heartbeatLatency.add(new MetricName("heartbeat-rate", + this.metricGrpName, + "The average number of heartbeats per second", + tags), new Rate(new Count())); + + this.joinLatency = metrics.sensor("join-latency"); + this.joinLatency.add(new MetricName("join-time-avg", + this.metricGrpName, + "The average time taken for a group rejoin", + tags), new Avg()); + this.joinLatency.add(new MetricName("join-time-max", + this.metricGrpName, + "The max time taken for a group rejoin", + tags), new Avg()); + this.joinLatency.add(new MetricName("join-rate", + this.metricGrpName, + "The number of group joins per second", + tags), new Rate(new Count())); + + this.syncLatency = metrics.sensor("sync-latency"); + this.syncLatency.add(new MetricName("sync-time-avg", + this.metricGrpName, + "The average time taken for a group sync", + tags), new Avg()); + this.syncLatency.add(new MetricName("sync-time-max", + this.metricGrpName, + "The max time taken for a group sync", + tags), new Avg()); + this.syncLatency.add(new MetricName("sync-rate", + this.metricGrpName, + "The number of group syncs per second", + tags), new Rate(new Count())); + + Measurable lastHeartbeat = + new Measurable() { + public double measure(MetricConfig config, long now) { + return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS); + } + }; + metrics.addMetric(new MetricName("last-heartbeat-seconds-ago", + this.metricGrpName, + "The number of seconds since the last controller heartbeat", + tags), + lastHeartbeat); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java new file mode 100644 index 0000000..12fa913 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Abstract assignor implementation which does some common grunt work (in particular collecting + * partition counts which are always needed in assignors). + */ +public abstract class AbstractPartitionAssignor implements PartitionAssignor { + private static final Logger log = LoggerFactory.getLogger(AbstractPartitionAssignor.class); + + /** + * Perform the group assignment given the partition counts and member subscriptions + * @param partitionsPerTopic The number of partitions for each subscribed topic (may be empty for some topics) + * @param subscriptions Map from the memberId to their respective topic subscription + * @return Map from each member to the + */ + public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, + Map<String, List<String>> subscriptions); + + @Override + public Subscription subscription(Set<String> topics) { + return new Subscription(new ArrayList<>(topics)); + } + + @Override + public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) { + Set<String> allSubscribedTopics = new HashSet<>(); + Map<String, List<String>> topicSubscriptions = new HashMap<>(); + for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) { + List<String> topics = subscriptionEntry.getValue().topics(); + allSubscribedTopics.addAll(topics); + topicSubscriptions.put(subscriptionEntry.getKey(), topics); + } + + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + for (String topic : allSubscribedTopics) { + Integer numPartitions = metadata.partitionCountForTopic(topic); + if (numPartitions != null) + partitionsPerTopic.put(topic, numPartitions); + else + log.debug("Skipping assignment for topic {} since no metadata is available", topic); + } + + Map<String, List<TopicPartition>> rawAssignments = assign(partitionsPerTopic, topicSubscriptions); + + // this class has maintains no user data, so just wrap the results + Map<String, Assignment> assignments = new HashMap<>(); + for (Map.Entry<String, List<TopicPartition>> assignmentEntry : rawAssignments.entrySet()) + assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue())); + return assignments; + } + + @Override + public void onAssignment(Assignment assignment) { + // this assignor maintains no internal state, so nothing to do + } + + protected static <K, V> void put(Map<K, List<V>> map, K key, V value) { + List<V> list = map.get(key); + if (list == null) { + list = new ArrayList<>(); + map.put(key, list); + } + list.add(value); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java new file mode 100644 index 0000000..fc7e819 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -0,0 +1,595 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerWakeupException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment; +import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Count; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.OffsetCommitResponse; +import org.apache.kafka.common.requests.OffsetFetchRequest; +import org.apache.kafka.common.requests.OffsetFetchResponse; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * This class manages the coordination process with the consumer coordinator. + */ +public final class ConsumerCoordinator extends AbstractCoordinator implements Closeable { + + private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class); + + private final Map<String, PartitionAssignor> protocolMap; + private final org.apache.kafka.clients.Metadata metadata; + private final MetadataSnapshot metadataSnapshot; + private final ConsumerCoordinatorMetrics sensors; + private final SubscriptionState subscriptions; + private final OffsetCommitCallback defaultOffsetCommitCallback; + private final boolean autoCommitEnabled; + + /** + * Initialize the coordination manager. + */ + public ConsumerCoordinator(ConsumerNetworkClient client, + String groupId, + int sessionTimeoutMs, + int heartbeatIntervalMs, + List<PartitionAssignor> assignors, + Metadata metadata, + SubscriptionState subscriptions, + Metrics metrics, + String metricGrpPrefix, + Map<String, String> metricTags, + Time time, + long requestTimeoutMs, + long retryBackoffMs, + OffsetCommitCallback defaultOffsetCommitCallback, + boolean autoCommitEnabled, + long autoCommitIntervalMs) { + super(client, + groupId, + sessionTimeoutMs, + heartbeatIntervalMs, + metrics, + metricGrpPrefix, + metricTags, + time, + requestTimeoutMs, + retryBackoffMs); + this.metadata = metadata; + + this.metadata.requestUpdate(); + this.metadataSnapshot = new MetadataSnapshot(); + this.subscriptions = subscriptions; + this.defaultOffsetCommitCallback = defaultOffsetCommitCallback; + this.autoCommitEnabled = autoCommitEnabled; + + this.protocolMap = new HashMap<>(); + for (PartitionAssignor assignor : assignors) + this.protocolMap.put(assignor.name(), assignor); + + addMetadataListener(); + + if (autoCommitEnabled) + scheduleAutoCommitTask(autoCommitIntervalMs); + + this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags); + } + + @Override + public String protocolType() { + return "consumer"; + } + + @Override + public LinkedHashMap<String, ByteBuffer> metadata() { + LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>(); + for (PartitionAssignor assignor : protocolMap.values()) { + Subscription subscription = assignor.subscription(subscriptions.subscription()); + metadata.put(assignor.name(), ConsumerProtocol.serializeSubscription(subscription)); + } + return metadata; + } + + private void addMetadataListener() { + this.metadata.addListener(new Metadata.Listener() { + @Override + public void onMetadataUpdate(Cluster cluster) { + if (subscriptions.hasPatternSubscription()) { + final List<String> topicsToSubscribe = new ArrayList<>(); + + for (String topic : cluster.topics()) + if (subscriptions.getSubscribedPattern().matcher(topic).matches()) + topicsToSubscribe.add(topic); + + subscriptions.changeSubscription(topicsToSubscribe); + metadata.setTopics(subscriptions.groupSubscription()); + } + + // check if there are any changes to the metadata which should trigger a rebalance + if (metadataSnapshot.update(subscriptions, cluster) && subscriptions.partitionsAutoAssigned()) + subscriptions.needReassignment(); + } + }); + } + + @Override + protected void onJoin(int generation, + String memberId, + String assignmentStrategy, + ByteBuffer assignmentBuffer) { + PartitionAssignor assignor = protocolMap.get(assignmentStrategy); + if (assignor == null) + throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); + + Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer); + + // set the flag to refresh last committed offsets + subscriptions.needRefreshCommits(); + + // update partition assignment + subscriptions.changePartitionAssignment(assignment.partitions()); + + // give the assignor a chance to update internal state based on the received assignment + assignor.onAssignment(assignment); + + // execute the user's callback after rebalance + ConsumerRebalanceListener listener = subscriptions.listener(); + log.debug("Setting newly assigned partitions {}", subscriptions.assignedPartitions()); + try { + Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions()); + listener.onPartitionsAssigned(assigned); + } catch (Exception e) { + log.error("User provided listener " + listener.getClass().getName() + + " failed on partition assignment: ", e); + } + } + + @Override + protected Map<String, ByteBuffer> doSync(String leaderId, + String assignmentStrategy, + Map<String, ByteBuffer> allSubscriptions) { + PartitionAssignor assignor = protocolMap.get(protocol); + if (assignor == null) + throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); + + Set<String> allSubscribedTopics = new HashSet<>(); + Map<String, Subscription> subscriptions = new HashMap<>(); + for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) { + Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue()); + subscriptions.put(subscriptionEntry.getKey(), subscription); + allSubscribedTopics.addAll(subscription.topics()); + } + + // the leader will begin watching for changes to any of the topics the group is interested in, + // which ensures that all metadata changes will eventually be seen + this.subscriptions.groupSubscribe(allSubscribedTopics); + metadata.setTopics(this.subscriptions.groupSubscription()); + client.ensureFreshMetadata(); + + log.debug("Performing {} assignment for subscriptions {}", assignor.name(), subscriptions); + + Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions); + + log.debug("Finished assignment: {}", assignment); + + Map<String, ByteBuffer> groupAssignment = new HashMap<>(); + for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) { + ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue()); + groupAssignment.put(assignmentEntry.getKey(), buffer); + } + + return groupAssignment; + } + + @Override + protected void onLeave(int generation, String memberId) { + // commit offsets prior to rebalance if auto-commit enabled + maybeAutoCommitOffsetsSync(); + + // execute the user's callback before rebalance + ConsumerRebalanceListener listener = subscriptions.listener(); + log.debug("Revoking previously assigned partitions {}", subscriptions.assignedPartitions()); + try { + Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions()); + listener.onPartitionsRevoked(revoked); + } catch (Exception e) { + log.error("User provided listener " + listener.getClass().getName() + + " failed on partition revocation: ", e); + } + + subscriptions.needReassignment(); + } + + @Override + public boolean needRejoin() { + return subscriptions.partitionsAutoAssigned() && + (super.needRejoin() || subscriptions.partitionAssignmentNeeded()); + } + + /** + * Refresh the committed offsets for provided partitions. + */ + public void refreshCommittedOffsetsIfNeeded() { + if (subscriptions.refreshCommitsNeeded()) { + Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions()); + for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { + TopicPartition tp = entry.getKey(); + // verify assignment is still active + if (subscriptions.isAssigned(tp)) + this.subscriptions.committed(tp, entry.getValue()); + } + this.subscriptions.commitsRefreshed(); + } + } + + /** + * Fetch the current committed offsets from the coordinator for a set of partitions. + * @param partitions The partitions to fetch offsets for + * @return A map from partition to the committed offset + */ + public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) { + while (true) { + ensureCoordinatorKnown(); + + // contact coordinator to fetch committed offsets + RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions); + client.poll(future); + + if (future.succeeded()) + return future.value(); + + if (!future.isRetriable()) + throw future.exception(); + + Utils.sleep(retryBackoffMs); + } + } + + /** + * Ensure that we have a valid partition assignment from the coordinator. + */ + public void ensurePartitionAssignment() { + if (subscriptions.partitionsAutoAssigned()) + ensureActiveGroup(); + } + + @Override + public void close() { + // commit offsets prior to closing if auto-commit enabled + while (true) { + try { + maybeAutoCommitOffsetsSync(); + return; + } catch (ConsumerWakeupException e) { + // ignore wakeups while closing to ensure we have a chance to commit + continue; + } + } + } + + public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { + this.subscriptions.needRefreshCommits(); + RequestFuture<Void> future = sendOffsetCommitRequest(offsets); + final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; + future.addListener(new RequestFutureListener<Void>() { + @Override + public void onSuccess(Void value) { + cb.onComplete(offsets, null); + } + + @Override + public void onFailure(RuntimeException e) { + cb.onComplete(offsets, e); + } + }); + } + + public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) { + if (offsets.isEmpty()) + return; + + while (true) { + ensureCoordinatorKnown(); + + RequestFuture<Void> future = sendOffsetCommitRequest(offsets); + client.poll(future); + + if (future.succeeded()) { + return; + } + + if (!future.isRetriable()) { + throw future.exception(); + } + + Utils.sleep(retryBackoffMs); + } + } + + private void scheduleAutoCommitTask(final long interval) { + DelayedTask task = new DelayedTask() { + public void run(long now) { + commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() { + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { + if (exception != null) + log.error("Auto offset commit failed.", exception); + } + }); + client.schedule(this, now + interval); + } + }; + client.schedule(task, time.milliseconds() + interval); + } + + private void maybeAutoCommitOffsetsSync() { + if (autoCommitEnabled) { + try { + commitOffsetsSync(subscriptions.allConsumed()); + } catch (ConsumerWakeupException e) { + // rethrow wakeups since they are triggered by the user + throw e; + } catch (Exception e) { + // consistent with async auto-commit failures, we do not propagate the exception + log.error("Auto offset commit failed.", e); + } + } + } + + /** + * Commit offsets for the specified list of topics and partitions. This is a non-blocking call + * which returns a request future that can be polled in the case of a synchronous commit or ignored in the + * asynchronous case. + * + * @param offsets The list of offsets per partition that should be committed. + * @return A request future whose value indicates whether the commit was successful or not + */ + private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) { + if (coordinatorUnknown()) + return RequestFuture.coordinatorNotAvailable(); + + if (offsets.isEmpty()) + return RequestFuture.voidSuccess(); + + // create the offset commit request + Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size()); + for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { + OffsetAndMetadata offsetAndMetadata = entry.getValue(); + offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData( + offsetAndMetadata.offset(), offsetAndMetadata.metadata())); + } + + OffsetCommitRequest req = new OffsetCommitRequest(this.groupId, + this.generation, + this.memberId, + OffsetCommitRequest.DEFAULT_RETENTION_TIME, + offsetData); + + return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req) + .compose(new OffsetCommitResponseHandler(offsets)); + } + + public static class DefaultOffsetCommitCallback implements OffsetCommitCallback { + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { + if (exception != null) + log.error("Offset commit failed.", exception); + } + } + + private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> { + + private final Map<TopicPartition, OffsetAndMetadata> offsets; + + public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) { + this.offsets = offsets; + } + + @Override + public OffsetCommitResponse parse(ClientResponse response) { + return new OffsetCommitResponse(response.responseBody()); + } + + @Override + public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) { + sensors.commitLatency.record(response.requestLatencyMs()); + for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) { + TopicPartition tp = entry.getKey(); + OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp); + long offset = offsetAndMetadata.offset(); + + short errorCode = entry.getValue(); + if (errorCode == Errors.NONE.code()) { + log.debug("Committed offset {} for partition {}", offset, tp); + if (subscriptions.isAssigned(tp)) + // update the local cache only if the partition is still assigned + subscriptions.committed(tp, offsetAndMetadata); + } else { + if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() + || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { + coordinatorDead(); + } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code() + || errorCode == Errors.ILLEGAL_GENERATION.code()) { + // need to re-join group + subscriptions.needReassignment(); + } + + log.error("Error committing partition {} at offset {}: {}", + tp, + offset, + Errors.forCode(errorCode).exception().getMessage()); + + future.raise(Errors.forCode(errorCode)); + return; + } + } + + future.complete(null); + } + } + + /** + * Fetch the committed offsets for a set of partitions. This is a non-blocking call. The + * returned future can be polled to get the actual offsets returned from the broker. + * + * @param partitions The set of partitions to get offsets for. + * @return A request future containing the committed offsets. + */ + private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) { + if (coordinatorUnknown()) + return RequestFuture.coordinatorNotAvailable(); + + log.debug("Fetching committed offsets for partitions: {}", Utils.join(partitions, ", ")); + // construct the request + OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions)); + + // send the request with a callback + return client.send(coordinator, ApiKeys.OFFSET_FETCH, request) + .compose(new OffsetFetchResponseHandler()); + } + + private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> { + + @Override + public OffsetFetchResponse parse(ClientResponse response) { + return new OffsetFetchResponse(response.responseBody()); + } + + @Override + public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) { + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size()); + for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) { + TopicPartition tp = entry.getKey(); + OffsetFetchResponse.PartitionData data = entry.getValue(); + if (data.hasError()) { + log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode) + .exception() + .getMessage()); + if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) { + // just retry + future.raise(Errors.OFFSET_LOAD_IN_PROGRESS); + } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { + // re-discover the coordinator and retry + coordinatorDead(); + future.raise(Errors.NOT_COORDINATOR_FOR_GROUP); + } else if (data.errorCode == Errors.UNKNOWN_MEMBER_ID.code() + || data.errorCode == Errors.ILLEGAL_GENERATION.code()) { + // need to re-join group + subscriptions.needReassignment(); + future.raise(Errors.forCode(data.errorCode)); + } else { + future.raise(new KafkaException("Unexpected error in fetch offset response: " + + Errors.forCode(data.errorCode).exception().getMessage())); + } + return; + } else if (data.offset >= 0) { + // record the position with the offset (-1 indicates no committed offset to fetch) + offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata)); + } else { + log.debug("No committed offset for partition " + tp); + } + } + + future.complete(offsets); + } + } + + private class ConsumerCoordinatorMetrics { + public final Metrics metrics; + public final String metricGrpName; + + public final Sensor commitLatency; + + public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) { + this.metrics = metrics; + this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; + + this.commitLatency = metrics.sensor("commit-latency"); + this.commitLatency.add(new MetricName("commit-latency-avg", + this.metricGrpName, + "The average time taken for a commit request", + tags), new Avg()); + this.commitLatency.add(new MetricName("commit-latency-max", + this.metricGrpName, + "The max time taken for a commit request", + tags), new Max()); + this.commitLatency.add(new MetricName("commit-rate", + this.metricGrpName, + "The number of commit calls per second", + tags), new Rate(new Count())); + + Measurable numParts = + new Measurable() { + public double measure(MetricConfig config, long now) { + return subscriptions.assignedPartitions().size(); + } + }; + metrics.addMetric(new MetricName("assigned-partitions", + this.metricGrpName, + "The number of partitions currently assigned to this consumer", + tags), + numParts); + } + } + + private static class MetadataSnapshot { + private Map<String, Integer> partitionsPerTopic = new HashMap<>(); + + public boolean update(SubscriptionState subscription, Cluster cluster) { + Map<String, Integer> partitionsPerTopic = new HashMap<>(); + for (String topic : subscription.groupSubscription()) + partitionsPerTopic.put(topic, cluster.partitionCountForTopic(topic)); + + if (!partitionsPerTopic.equals(this.partitionsPerTopic)) { + this.partitionsPerTopic = partitionsPerTopic; + return true; + } + + return false; + } + } + + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 4153eb3..fbfe54a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -127,6 +127,15 @@ public class ConsumerNetworkClient implements Closeable { } /** + * Ensure our metadata is fresh (if an update is expected, this will block + * until it has completed). + */ + public void ensureFreshMetadata() { + if (this.metadata.timeToNextUpdate(time.milliseconds()) == 0) + awaitMetadataUpdate(); + } + + /** * Wakeup an active poll. This will cause the polling thread to throw an exception either * on the current poll if one is active, or the next poll. */
