Repository: kafka Updated Branches: refs/heads/0.10.1 ecb51680a -> 68e7af812
Cherry-pick KAFKA-4355: Skip topics that have no partitions, by Eno Thereska Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68e7af81 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68e7af81 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68e7af81 Branch: refs/heads/0.10.1 Commit: 68e7af812b94494e5798f3644b569e454036ac12 Parents: ecb5168 Author: Guozhang Wang <[email protected]> Authored: Wed Nov 23 21:33:33 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Nov 23 21:33:33 2016 -0800 ---------------------------------------------------------------------- .../streams/processor/DefaultPartitionGrouper.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/68e7af81/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java index f0fb38c..554f7d3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -58,7 +58,8 @@ public class DefaultPartitionGrouper implements PartitionGrouper { Set<TopicPartition> group = new HashSet<>(topicGroup.size()); for (String topic : topicGroup) { - if (partitionId < metadata.partitionsForTopic(topic).size()) { + List<PartitionInfo> partitions = metadata.partitionsForTopic(topic); + if (partitions != null && partitionId < partitions.size()) { group.add(new TopicPartition(topic, partitionId)); } } @@ -77,12 +78,11 @@ public class DefaultPartitionGrouper implements PartitionGrouper { for (String topic : topics) { List<PartitionInfo> partitions = metadata.partitionsForTopic(topic); - if (partitions == null) - throw new StreamsException("Topic not found during partition assignment: " + topic); - - int numPartitions = partitions.size(); - if (numPartitions > maxNumPartitions) - maxNumPartitions = numPartitions; + if (partitions != null) { + int numPartitions = partitions.size(); + if (numPartitions > maxNumPartitions) + maxNumPartitions = numPartitions; + } } return maxNumPartitions; }
