Repository: kafka Updated Branches: refs/heads/trunk a30491ac5 -> aa775a199
kafka-1609; New producer metadata response handling should only exclude a PartitionInfo when its error is LEADER_NOT_AVAILABLE; patched by Dong Lin; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa775a19 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa775a19 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa775a19 Branch: refs/heads/trunk Commit: aa775a199edbdbb4bcb1b3ac8f75d7e5c80fcaee Parents: a30491a Author: Dong Lin <[email protected]> Authored: Fri Aug 22 17:05:33 2014 -0700 Committer: Jun Rao <[email protected]> Committed: Fri Aug 22 17:05:33 2014 -0700 ---------------------------------------------------------------------- .../kafka/common/requests/MetadataResponse.java | 27 +++++++++----------- 1 file changed, 12 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/aa775a19/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 7d90fce..d97962d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -117,21 +117,18 @@ public class MetadataResponse extends AbstractRequestResponse { Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME); for (int j = 0; j < partitionInfos.length; j++) { Struct partitionInfo = (Struct) partitionInfos[j]; - short partError = partitionInfo.getShort(PARTITION_ERROR_CODE_KEY_NAME); - if (partError == Errors.NONE.code()) { - int partition = partitionInfo.getInt(PARTITION_KEY_NAME); - int leader = partitionInfo.getInt(LEADER_KEY_NAME); - Node leaderNode = leader == -1 ? null : brokers.get(leader); - Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME); - Node[] replicaNodes = new Node[replicas.length]; - for (int k = 0; k < replicas.length; k++) - replicaNodes[k] = brokers.get(replicas[k]); - Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME); - Node[] isrNodes = new Node[isr.length]; - for (int k = 0; k < isr.length; k++) - isrNodes[k] = brokers.get(isr[k]); - partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes)); - } + int partition = partitionInfo.getInt(PARTITION_KEY_NAME); + int leader = partitionInfo.getInt(LEADER_KEY_NAME); + Node leaderNode = leader == -1 ? null : brokers.get(leader); + Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME); + Node[] replicaNodes = new Node[replicas.length]; + for (int k = 0; k < replicas.length; k++) + replicaNodes[k] = brokers.get(replicas[k]); + Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME); + Node[] isrNodes = new Node[isr.length]; + for (int k = 0; k < isr.length; k++) + isrNodes[k] = brokers.get(isr[k]); + partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes)); } } else { errors.put(topic, Errors.forCode(topicError));
