http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index fa88ac1..f50da82 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -9,185 +9,174 @@ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. -*/ + */ package org.apache.kafka.clients.consumer; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; import java.util.ArrayList; -import java.util.Collection; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; + +import org.apache.kafka.clients.consumer.internals.SubscriptionState; import org.apache.kafka.common.Metric; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.MetricName; /** - * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. - * This class is <i> not threadsafe </i> + * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not + * threadsafe </i> * <p> - * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it - * needs to communicate with. Failure to close the consumer after use will leak these resources. + * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it needs to + * communicate with. Failure to close the consumer after use will leak these resources. */ -public class MockConsumer implements Consumer<byte[], byte[]> { +public class MockConsumer<K, V> implements Consumer<K, V> { + + private final Map<String, List<PartitionInfo>> partitions; + private final SubscriptionState subscriptions; + private Map<TopicPartition, List<ConsumerRecord<K, V>>> records; + private boolean closed; - private final Set<TopicPartition> subscribedPartitions; - private final Set<String> subscribedTopics; - private final Map<TopicPartition, Long> committedOffsets; - private final Map<TopicPartition, Long> consumedOffsets; - public MockConsumer() { - subscribedPartitions = new HashSet<TopicPartition>(); - subscribedTopics = new HashSet<String>(); - committedOffsets = new HashMap<TopicPartition, Long>(); - consumedOffsets = new HashMap<TopicPartition, Long>(); + this.subscriptions = new SubscriptionState(); + this.partitions = new HashMap<String, List<PartitionInfo>>(); + this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>(); + this.closed = false; } @Override - public void subscribe(String... topics) { - if(subscribedPartitions.size() > 0) - throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); - for(String topic : topics) { - subscribedTopics.add(topic); - } + public synchronized Set<TopicPartition> subscriptions() { + return this.subscriptions.assignedPartitions(); } @Override - public void subscribe(TopicPartition... partitions) { - if(subscribedTopics.size() > 0) - throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); - for(TopicPartition partition : partitions) { - subscribedPartitions.add(partition); - consumedOffsets.put(partition, 0L); - } + public synchronized void subscribe(String... topics) { + ensureNotClosed(); + for (String topic : topics) + this.subscriptions.subscribe(topic); } - public void unsubscribe(String... topics) { - // throw an exception if the topic was never subscribed to - for(String topic:topics) { - if(!subscribedTopics.contains(topic)) - throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" + - " to unsubscribe(" + topic + ")"); - subscribedTopics.remove(topic); - } + @Override + public synchronized void subscribe(TopicPartition... partitions) { + ensureNotClosed(); + for (TopicPartition partition : partitions) + this.subscriptions.subscribe(partition); } - public void unsubscribe(TopicPartition... partitions) { - // throw an exception if the partition was never subscribed to - for(TopicPartition partition:partitions) { - if(!subscribedPartitions.contains(partition)) - throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" + - partition.topic() + "," + partition.partition() + ") should be called prior" + - " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")"); - subscribedPartitions.remove(partition); - committedOffsets.remove(partition); - consumedOffsets.remove(partition); - } + public synchronized void unsubscribe(String... topics) { + ensureNotClosed(); + for (String topic : topics) + this.subscriptions.unsubscribe(topic); + } + + public synchronized void unsubscribe(TopicPartition... partitions) { + ensureNotClosed(); + for (TopicPartition partition : partitions) + this.subscriptions.unsubscribe(partition); } @Override - public Map<String, ConsumerRecords<byte[], byte[]>> poll(long timeout) { - // hand out one dummy record, 1 per topic - Map<String, List<ConsumerRecord>> records = new HashMap<String, List<ConsumerRecord>>(); - Map<String, ConsumerRecords<byte[], byte[]>> recordMetadata = new HashMap<String, ConsumerRecords<byte[], byte[]>>(); - for(TopicPartition partition : subscribedPartitions) { - // get the last consumed offset - long messageSequence = consumedOffsets.get(partition); - ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); - ObjectOutputStream outputStream; - try { - outputStream = new ObjectOutputStream(byteStream); - outputStream.writeLong(messageSequence++); - outputStream.close(); - } catch (IOException e) { - e.printStackTrace(); - } - List<ConsumerRecord> recordsForTopic = records.get(partition.topic()); - if(recordsForTopic == null) { - recordsForTopic = new ArrayList<ConsumerRecord>(); - records.put(partition.topic(), recordsForTopic); - } - recordsForTopic.add(new ConsumerRecord(partition.topic(), partition.partition(), null, byteStream.toByteArray(), messageSequence)); - consumedOffsets.put(partition, messageSequence); + public synchronized ConsumerRecords<K, V> poll(long timeout) { + ensureNotClosed(); + // update the consumed offset + for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) { + List<ConsumerRecord<K, V>> recs = entry.getValue(); + if (!recs.isEmpty()) + this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset()); } - for(Entry<String, List<ConsumerRecord>> recordsPerTopic : records.entrySet()) { - Map<Integer, List<ConsumerRecord>> recordsPerPartition = new HashMap<Integer, List<ConsumerRecord>>(); - for(ConsumerRecord record : recordsPerTopic.getValue()) { - List<ConsumerRecord> recordsForThisPartition = recordsPerPartition.get(record.partition()); - if(recordsForThisPartition == null) { - recordsForThisPartition = new ArrayList<ConsumerRecord>(); - recordsPerPartition.put(record.partition(), recordsForThisPartition); - } - recordsForThisPartition.add(record); - } - recordMetadata.put(recordsPerTopic.getKey(), new ConsumerRecords(recordsPerTopic.getKey(), recordsPerPartition)); + + ConsumerRecords<K, V> copy = new ConsumerRecords<K, V>(this.records); + this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>(); + return copy; + } + + public synchronized void addRecord(ConsumerRecord<K, V> record) { + ensureNotClosed(); + TopicPartition tp = new TopicPartition(record.topic(), record.partition()); + this.subscriptions.assignedPartitions().add(tp); + List<ConsumerRecord<K, V>> recs = this.records.get(tp); + if (recs == null) { + recs = new ArrayList<ConsumerRecord<K, V>>(); + this.records.put(tp, recs); } - return recordMetadata; + recs.add(record); } @Override - public OffsetMetadata commit(Map<TopicPartition, Long> offsets, boolean sync) { - if(!sync) - return null; - for(Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) { - committedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue()); - } - return new OffsetMetadata(committedOffsets, null); + public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType) { + ensureNotClosed(); + for (Entry<TopicPartition, Long> entry : offsets.entrySet()) + subscriptions.committed(entry.getKey(), entry.getValue()); } @Override - public OffsetMetadata commit(boolean sync) { - if(!sync) - return null; - return commit(consumedOffsets, sync); + public synchronized void commit(CommitType commitType) { + ensureNotClosed(); + commit(this.subscriptions.allConsumed(), commitType); } @Override - public void seek(Map<TopicPartition, Long> offsets) { - // change the fetch offsets - for(Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) { - consumedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue()); - } + public synchronized void seek(TopicPartition partition, long offset) { + ensureNotClosed(); + subscriptions.seek(partition, offset); } @Override - public Map<TopicPartition, Long> committed(Collection<TopicPartition> partitions) { - Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(); - for(TopicPartition partition : partitions) { - offsets.put(new TopicPartition(partition.topic(), partition.partition()), committedOffsets.get(partition)); - } - return offsets; + public synchronized long committed(TopicPartition partition) { + ensureNotClosed(); + return subscriptions.committed(partition); } @Override - public Map<TopicPartition, Long> position(Collection<TopicPartition> partitions) { - Map<TopicPartition, Long> positions = new HashMap<TopicPartition, Long>(); - for(TopicPartition partition : partitions) { - positions.put(partition, consumedOffsets.get(partition)); - } - return positions; + public synchronized long position(TopicPartition partition) { + ensureNotClosed(); + return subscriptions.consumed(partition); + } + + @Override + public synchronized void seekToBeginning(TopicPartition... partitions) { + ensureNotClosed(); + throw new UnsupportedOperationException(); } @Override - public Map<TopicPartition, Long> offsetsBeforeTime(long timestamp, - Collection<TopicPartition> partitions) { + public synchronized void seekToEnd(TopicPartition... partitions) { + ensureNotClosed(); throw new UnsupportedOperationException(); } @Override public Map<MetricName, ? extends Metric> metrics() { - return null; + ensureNotClosed(); + return Collections.emptyMap(); } @Override - public void close() { - // unsubscribe from all partitions - TopicPartition[] allPartitions = new TopicPartition[subscribedPartitions.size()]; - unsubscribe(subscribedPartitions.toArray(allPartitions)); + public synchronized List<PartitionInfo> partitionsFor(String topic) { + ensureNotClosed(); + List<PartitionInfo> parts = this.partitions.get(topic); + if (parts == null) + return Collections.emptyList(); + else + return parts; + } + + public synchronized void updatePartitions(String topic, List<PartitionInfo> partitions) { + ensureNotClosed(); + this.partitions.put(topic, partitions); + } + + @Override + public synchronized void close() { + ensureNotClosed(); + this.closed = true; + } + + private void ensureNotClosed() { + if (this.closed) + throw new IllegalStateException("This consumer has already been closed."); } }
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java new file mode 100644 index 0000000..a21f97b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.KafkaException; + +/** + * Indicates that there is no stored offset and no defined offset reset policy + */ +public class NoOffsetForPartitionException extends KafkaException { + + private static final long serialVersionUID = 1L; + + public NoOffsetForPartitionException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java deleted file mode 100644 index ea423ad..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer; - -import java.util.Map; - -import org.apache.kafka.common.TopicPartition; - -/** - * The metadata for an offset commit that has been acknowledged by the server - */ -public final class OffsetMetadata { - - private final Map<TopicPartition, Long> offsets; - private final Map<TopicPartition, RuntimeException> errors; - - public OffsetMetadata(Map<TopicPartition, Long> offsets, Map<TopicPartition, RuntimeException> errors) { - super(); - this.offsets = offsets; - this.errors = errors; - } - - public OffsetMetadata(Map<TopicPartition, Long> offsets) { - this(offsets, null); - } - - /** - * The offset of the record in the topic/partition. - */ - public long offset(TopicPartition partition) { - if(this.errors != null) - throw errors.get(partition); - return offsets.get(partition); - } - - /** - * @return The exception corresponding to the error code returned by the server - */ - public Exception error(TopicPartition partition) { - if(errors != null) - return errors.get(partition); - else - return null; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java new file mode 100644 index 0000000..d9483ec --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java @@ -0,0 +1,47 @@ +package org.apache.kafka.clients.consumer.internals; + +/** + * A helper class for managing the heartbeat to the co-ordinator + */ +public final class Heartbeat { + + /* The number of heartbeats to attempt to complete per session timeout interval. + * so, e.g., with a session timeout of 3 seconds we would attempt a heartbeat + * once per second. + */ + private final static int HEARTBEATS_PER_SESSION_INTERVAL = 3; + + private final long timeout; + private long lastHeartbeatSend; + private long lastHeartbeatResponse; + + public Heartbeat(long timeout, long now) { + this.timeout = timeout; + this.lastHeartbeatSend = now; + this.lastHeartbeatResponse = now; + } + + public void sentHeartbeat(long now) { + this.lastHeartbeatSend = now; + } + + public void receivedResponse(long now) { + this.lastHeartbeatResponse = now; + } + + public void markDead() { + this.lastHeartbeatResponse = -1; + } + + public boolean isAlive(long now) { + return now - lastHeartbeatResponse <= timeout; + } + + public boolean shouldHeartbeat(long now) { + return now - lastHeartbeatSend > (1.0 / HEARTBEATS_PER_SESSION_INTERVAL) * this.timeout; + } + + public long lastHeartbeatSend() { + return this.lastHeartbeatSend; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java new file mode 100644 index 0000000..7e57a39 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import java.util.Collection; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback; +import org.apache.kafka.common.TopicPartition; + +public class NoOpConsumerRebalanceCallback implements ConsumerRebalanceCallback { + + @Override + public void onPartitionsAssigned(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {} + + @Override + public void onPartitionsRevoked(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {} + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java new file mode 100644 index 0000000..71ce20d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -0,0 +1,166 @@ +package org.apache.kafka.clients.consumer.internals; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.TopicPartition; + +/** + * A class for tracking the topics, partitions, and offsets for the consumer + */ +public class SubscriptionState { + + /* the list of topics the user has requested */ + private final Set<String> subscribedTopics; + + /* the list of partitions the user has requested */ + private final Set<TopicPartition> subscribedPartitions; + + /* the list of partitions currently assigned */ + private final Set<TopicPartition> assignedPartitions; + + /* the offset exposed to the user */ + private final Map<TopicPartition, Long> consumed; + + /* the current point we have fetched up to */ + private final Map<TopicPartition, Long> fetched; + + /* the last committed offset for each partition */ + private final Map<TopicPartition, Long> committed; + + /* do we need to request a partition assignment from the co-ordinator? */ + private boolean needsPartitionAssignment; + + public SubscriptionState() { + this.subscribedTopics = new HashSet<String>(); + this.subscribedPartitions = new HashSet<TopicPartition>(); + this.assignedPartitions = new HashSet<TopicPartition>(); + this.consumed = new HashMap<TopicPartition, Long>(); + this.fetched = new HashMap<TopicPartition, Long>(); + this.committed = new HashMap<TopicPartition, Long>(); + this.needsPartitionAssignment = false; + } + + public void subscribe(String topic) { + if (this.subscribedPartitions.size() > 0) + throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive"); + if (!this.subscribedTopics.contains(topic)) { + this.subscribedTopics.add(topic); + this.needsPartitionAssignment = true; + } + } + + public void unsubscribe(String topic) { + if (!this.subscribedTopics.contains(topic)) + throw new IllegalStateException("Topic " + topic + " was never subscribed to."); + this.subscribedTopics.remove(topic); + this.needsPartitionAssignment = true; + for(TopicPartition tp: assignedPartitions()) + if(topic.equals(tp.topic())) + clearPartition(tp); + } + + public void subscribe(TopicPartition tp) { + if (this.subscribedTopics.size() > 0) + throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive"); + this.subscribedPartitions.add(tp); + this.assignedPartitions.add(tp); + } + + public void unsubscribe(TopicPartition partition) { + if (!subscribedPartitions.contains(partition)) + throw new IllegalStateException("Partition " + partition + " was never subscribed to."); + subscribedPartitions.remove(partition); + clearPartition(partition); + } + + private void clearPartition(TopicPartition tp) { + this.assignedPartitions.remove(tp); + this.committed.remove(tp); + this.fetched.remove(tp); + this.consumed.remove(tp); + } + + public void clearAssignment() { + this.assignedPartitions.clear(); + this.committed.clear(); + this.fetched.clear(); + this.needsPartitionAssignment = !subscribedTopics().isEmpty(); + } + + public Set<String> subscribedTopics() { + return this.subscribedTopics; + } + + public Long fetched(TopicPartition tp) { + return this.fetched.get(tp); + } + + public void fetched(TopicPartition tp, long offset) { + if (!this.assignedPartitions.contains(tp)) + throw new IllegalArgumentException("Can't change the fetch position for a partition you are not currently subscribed to."); + this.fetched.put(tp, offset); + } + + public void committed(TopicPartition tp, long offset) { + this.committed.put(tp, offset); + } + + public Long committed(TopicPartition tp) { + return this.committed.get(tp); + } + + public void seek(TopicPartition tp, long offset) { + fetched(tp, offset); + consumed(tp, offset); + } + + public Set<TopicPartition> assignedPartitions() { + return this.assignedPartitions; + } + + public boolean partitionsAutoAssigned() { + return !this.subscribedTopics.isEmpty(); + } + + public void consumed(TopicPartition tp, long offset) { + if (!this.assignedPartitions.contains(tp)) + throw new IllegalArgumentException("Can't change the consumed position for a partition you are not currently subscribed to."); + this.consumed.put(tp, offset); + } + + public Long consumed(TopicPartition partition) { + return this.consumed.get(partition); + } + + public Map<TopicPartition, Long> allConsumed() { + return this.consumed; + } + + public boolean hasAllFetchPositions() { + return this.fetched.size() >= this.assignedPartitions.size(); + } + + public Set<TopicPartition> missingFetchPositions() { + Set<TopicPartition> copy = new HashSet<TopicPartition>(this.assignedPartitions); + copy.removeAll(this.fetched.keySet()); + return copy; + } + + public boolean needsPartitionAssignment() { + return this.needsPartitionAssignment; + } + + public void changePartitionAssignment(List<TopicPartition> assignments) { + for (TopicPartition tp : assignments) + if (!this.subscribedTopics.contains(tp.topic())) + throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic."); + this.clearAssignment(); + this.assignedPartitions.addAll(assignments); + this.needsPartitionAssignment = false; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index fc71710..ebc4c53 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -329,8 +329,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> { " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer"); } - ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), serializedKey, serializedValue); - int partition = partitioner.partition(serializedRecord, metadata.fetch()); + int partition = partitioner.partition(record.topic(), serializedKey, record.partition(), metadata.fetch()); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 904976f..84530f2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -100,7 +100,7 @@ public class MockProducer implements Producer<byte[], byte[]> { public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> record, Callback callback) { int partition = 0; if (this.cluster.partitionsForTopic(record.topic()) != null) - partition = partitioner.partition(record, this.cluster); + partition = partitioner.partition(record.topic(), record.key(), record.partition(), this.cluster); ProduceRequestResult result = new ProduceRequestResult(); FutureRecordMetadata future = new FutureRecordMetadata(result, 0); TopicPartition topicPartition = new TopicPartition(record.topic(), partition); http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 8b3e565..9a43d66 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -16,9 +16,9 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; -import java.util.Arrays; import java.util.Map; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -38,23 +38,17 @@ public class ProducerConfig extends AbstractConfig { private static final ConfigDef config; /** <code>bootstrap.servers</code> */ - public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; - private static final String BOOSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Data will be load " + "balanced over all servers irrespective of which servers are specified here for bootstrapping—this list only " - + "impacts the initial hosts used to discover the full set of servers. This list should be in the form " - + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to " - + "discover the full cluster membership (which may change dynamically), this list need not contain the full set of " - + "servers (you may want more than one, though, in case a server is down). If no server in this list is available sending " - + "data will fail until on becomes available."; + public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; /** <code>metadata.fetch.timeout.ms</code> */ public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; - private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the " + "topic's partitions. This configuration controls the maximum amount of time we will block waiting for the metadata " + private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions. This " + "fetch to succeed before throwing an exception back to the client."; /** <code>metadata.max.age.ms</code> */ - public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms"; - private static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any " + " partition leadership changes to proactively discover any new brokers or partitions."; - + public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; + private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC; + /** <code>batch.size</code> */ public static final String BATCH_SIZE_CONFIG = "batch.size"; private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent" + " to the same partition. This helps performance on both the client and the server. This configuration controls the " @@ -113,17 +107,13 @@ public class ProducerConfig extends AbstractConfig { + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load."; /** <code>client.id</code> */ - public static final String CLIENT_ID_CONFIG = "client.id"; - private static final String CLIENT_ID_DOC = "The id string to pass to the server when making requests. The purpose of this is to be able to track the source " + "of requests beyond just ip/port by allowing a logical application name to be included with the request. The " - + "application can set any string it wants as this has no functional purpose other than in logging and metrics."; + public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; /** <code>send.buffer.bytes</code> */ - public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes"; - private static final String SEND_BUFFER_DOC = "The size of the TCP send buffer to use when sending data"; + public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG; /** <code>receive.buffer.bytes</code> */ - public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes"; - private static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer to use when reading data"; + public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG; /** <code>max.request.size</code> */ public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size"; @@ -131,8 +121,7 @@ public class ProducerConfig extends AbstractConfig { + "batches the producer will send in a single request to avoid sending huge requests."; /** <code>reconnect.backoff.ms</code> */ - public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms"; - private static final String RECONNECT_BACKOFF_MS_DOC = "The amount of time to wait before attempting to reconnect to a given host when a connection fails." + " This avoids a scenario where the client repeatedly attempts to connect to a host in a tight loop."; + public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG; /** <code>block.on.buffer.full</code> */ public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full"; @@ -147,8 +136,7 @@ public class ProducerConfig extends AbstractConfig { + "may appear first."; /** <code>retry.backoff.ms</code> */ - public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; - private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed produce request to a given topic partition." + " This avoids repeated sending-and-failing in a tight loop."; + public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG; /** <code>compression.type</code> */ public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; @@ -156,17 +144,13 @@ public class ProducerConfig extends AbstractConfig { + "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression)."; /** <code>metrics.sample.window.ms</code> */ - public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms"; - private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. " - + "When a window expires we erase and overwrite the oldest window."; + public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; /** <code>metrics.num.samples</code> */ - public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples"; - private static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics."; + public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG; /** <code>metric.reporters</code> */ - public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; - private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; + public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; /** <code>max.in.flight.requests.per.connection</code> */ public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; @@ -183,22 +167,22 @@ public class ProducerConfig extends AbstractConfig { private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface."; static { - config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC) + config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC) .define(ACKS_CONFIG, Type.STRING, "1", - in(Arrays.asList("all", "-1", "0", "1")), + in("all","-1", "0", "1"), Importance.HIGH, ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC) .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC) - .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC) - .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, SEND_BUFFER_DOC) - .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, RECEIVE_BUFFER_DOC) + .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC) + .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC) + .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC) .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, @@ -206,9 +190,9 @@ public class ProducerConfig extends AbstractConfig { Importance.MEDIUM, MAX_REQUEST_SIZE_DOC) .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC) - .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, RECONNECT_BACKOFF_MS_DOC) - .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC) - .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, RETRY_BACKOFF_MS_DOC) + .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) + .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) + .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC) .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, @@ -221,8 +205,8 @@ public class ProducerConfig extends AbstractConfig { 30000, atLeast(0), Importance.LOW, - METRICS_SAMPLE_WINDOW_MS_DOC) - .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC) + CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC) + .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, CommonClientConfigs.METRICS_NUM_SAMPLES_DOC) .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, Type.INT, 5, http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index dcf4658..3aff624 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -78,9 +78,9 @@ public final class Metadata { } /** - * The next time to update the cluster info is the maximum of the time the current info will expire - * and the time the current info can be updated (i.e. backoff time has elapsed); If an update has - * been request then the expiry time is now + * The next time to update the cluster info is the maximum of the time the current info will expire and the time the + * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time + * is now */ public synchronized long timeToNextUpdate(long nowMs) { long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0); @@ -120,6 +120,15 @@ public final class Metadata { } /** + * Add one or more topics to maintain metadata for + */ + public synchronized void addTopics(String... topics) { + for (String topic : topics) + this.topics.add(topic); + requestUpdate(); + } + + /** * Get the list of topics we are currently maintaining metadata for */ public synchronized Set<String> topics() { @@ -137,6 +146,13 @@ public final class Metadata { notifyAll(); log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); } + + /** + * @return The current metadata version + */ + public synchronized int version() { + return this.version; + } /** * The last time metadata was updated. http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java index 483899d..8112e6d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java @@ -20,7 +20,6 @@ import java.util.List; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.utils.Utils; @@ -40,32 +39,34 @@ public class Partitioner { /** * Compute the partition for the given record. * - * @param record The record being sent + * @param topic The topic name + * @param key The key to partition on (or null if no key) + * @param partition The partition to use (or null if none) * @param cluster The current cluster metadata */ - public int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) { - List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic()); + public int partition(String topic, byte[] key, Integer partition, Cluster cluster) { + List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); - if (record.partition() != null) { + if (partition != null) { // they have given us a partition, use it - if (record.partition() < 0 || record.partition() >= numPartitions) - throw new IllegalArgumentException("Invalid partition given with record: " + record.partition() + if (partition < 0 || partition >= numPartitions) + throw new IllegalArgumentException("Invalid partition given with record: " + partition + " is not in the range [0..." + numPartitions + "]."); - return record.partition(); - } else if (record.key() == null) { + return partition; + } else if (key == null) { // choose the next available node in a round-robin fashion for (int i = 0; i < numPartitions; i++) { - int partition = Utils.abs(counter.getAndIncrement()) % numPartitions; - if (partitions.get(partition).leader() != null) - return partition; + int part = Utils.abs(counter.getAndIncrement()) % numPartitions; + if (partitions.get(part).leader() != null) + return part; } // no partitions are available, give a non-available partition return Utils.abs(counter.getAndIncrement()) % numPartitions; } else { // hash the key to choose a partition - return Utils.abs(Utils.murmur2(record.key())) % numPartitions; + return Utils.abs(Utils.murmur2(key)) % numPartitions; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index ccc03d8..8726809 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.RequestCompletionHandler; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -146,7 +147,8 @@ public class Sender implements Runnable { /** * Run a single iteration of sending * - * @param now The current POSIX time in milliseconds + * @param now + * The current POSIX time in milliseconds */ public void run(long now) { Cluster cluster = metadata.fetch(); @@ -169,9 +171,12 @@ public class Sender implements Runnable { } // create produce requests - Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); + Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, + result.readyNodes, + this.maxRequestSize, + now); + sensors.updateProduceRequestMetrics(batches); List<ClientRequest> requests = createProduceRequests(batches, now); - sensors.updateProduceRequestMetrics(requests); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data @@ -183,18 +188,14 @@ public class Sender implements Runnable { log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } + for (ClientRequest request : requests) + client.send(request); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; - List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now); - for (ClientResponse response : responses) { - if (response.wasDisconnected()) - handleDisconnect(response, now); - else - handleResponse(response, now); - } + this.client.poll(pollTimeout, now); } /** @@ -206,45 +207,44 @@ public class Sender implements Runnable { this.wakeup(); } - private void handleDisconnect(ClientResponse response, long now) { - log.trace("Cancelled request {} due to node {} being disconnected", response, response.request().request().destination()); - int correlation = response.request().request().header().correlationId(); - @SuppressWarnings("unchecked") - Map<TopicPartition, RecordBatch> responseBatches = (Map<TopicPartition, RecordBatch>) response.request().attachment(); - for (RecordBatch batch : responseBatches.values()) - completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, now); - } - /** * Handle a produce response */ - private void handleResponse(ClientResponse response, long now) { + private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) { int correlationId = response.request().request().header().correlationId(); - log.trace("Received produce response from node {} with correlation id {}", - response.request().request().destination(), - correlationId); - @SuppressWarnings("unchecked") - Map<TopicPartition, RecordBatch> batches = (Map<TopicPartition, RecordBatch>) response.request().attachment(); - // if we have a response, parse it - if (response.hasResponse()) { - ProduceResponse produceResponse = new ProduceResponse(response.responseBody()); - for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) { - TopicPartition tp = entry.getKey(); - ProduceResponse.PartitionResponse partResp = entry.getValue(); - Errors error = Errors.forCode(partResp.errorCode); - RecordBatch batch = batches.get(tp); - completeBatch(batch, error, partResp.baseOffset, correlationId, now); - } - this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); - } else { - // this is the acks = 0 case, just complete all requests + if (response.wasDisconnected()) { + log.trace("Cancelled request {} due to node {} being disconnected", response, response.request() + .request() + .destination()); for (RecordBatch batch : batches.values()) - completeBatch(batch, Errors.NONE, -1L, correlationId, now); + completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlationId, now); + } else { + log.trace("Received produce response from node {} with correlation id {}", + response.request().request().destination(), + correlationId); + // if we have a response, parse it + if (response.hasResponse()) { + ProduceResponse produceResponse = new ProduceResponse(response.responseBody()); + for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses() + .entrySet()) { + TopicPartition tp = entry.getKey(); + ProduceResponse.PartitionResponse partResp = entry.getValue(); + Errors error = Errors.forCode(partResp.errorCode); + RecordBatch batch = batches.get(tp); + completeBatch(batch, error, partResp.baseOffset, correlationId, now); + } + this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); + } else { + // this is the acks = 0 case, just complete all requests + for (RecordBatch batch : batches.values()) + completeBatch(batch, Errors.NONE, -1L, correlationId, now); + } } } /** * Complete or retry the given batch of records. + * * @param batch The record batch * @param error The error (or null if none) * @param baseOffset The base offset assigned to the records if successful @@ -294,7 +294,7 @@ public class Sender implements Runnable { */ private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) { Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size()); - Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size()); + final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size()); for (RecordBatch batch : batches) { TopicPartition tp = batch.topicPartition; ByteBuffer recordsBuffer = batch.records.buffer(); @@ -303,8 +303,15 @@ public class Sender implements Runnable { recordsByPartition.put(tp, batch); } ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition); - RequestSend send = new RequestSend(destination, this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct()); - return new ClientRequest(now, acks != 0, send, recordsByPartition); + RequestSend send = new RequestSend(destination, + this.client.nextRequestHeader(ApiKeys.PRODUCE), + request.toStruct()); + RequestCompletionHandler callback = new RequestCompletionHandler() { + public void onComplete(ClientResponse response) { + handleProduceResponse(response, recordsByPartition, time.milliseconds()); + } + }; + return new ClientRequest(now, acks != 0, send, callback); } /** @@ -428,44 +435,38 @@ public class Sender implements Runnable { } } - public void updateProduceRequestMetrics(List<ClientRequest> requests) { + public void updateProduceRequestMetrics(Map<Integer, List<RecordBatch>> batches) { long now = time.milliseconds(); - for (int i = 0; i < requests.size(); i++) { - ClientRequest request = requests.get(i); + for (List<RecordBatch> nodeBatch : batches.values()) { int records = 0; - - if (request.attachment() != null) { - Map<TopicPartition, RecordBatch> responseBatches = (Map<TopicPartition, RecordBatch>) request.attachment(); - for (RecordBatch batch : responseBatches.values()) { - - // register all per-topic metrics at once - String topic = batch.topicPartition.topic(); - maybeRegisterTopicMetrics(topic); - - // per-topic record send rate - String topicRecordsCountName = "topic." + topic + ".records-per-batch"; - Sensor topicRecordCount = Utils.notNull(this.metrics.getSensor(topicRecordsCountName)); - topicRecordCount.record(batch.recordCount); - - // per-topic bytes send rate - String topicByteRateName = "topic." + topic + ".bytes"; - Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName)); - topicByteRate.record(batch.records.sizeInBytes()); - - // per-topic compression rate - String topicCompressionRateName = "topic." + topic + ".compression-rate"; - Sensor topicCompressionRate = Utils.notNull(this.metrics.getSensor(topicCompressionRateName)); - topicCompressionRate.record(batch.records.compressionRate()); - - // global metrics - this.batchSizeSensor.record(batch.records.sizeInBytes(), now); - this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, now); - this.compressionRateSensor.record(batch.records.compressionRate()); - this.maxRecordSizeSensor.record(batch.maxRecordSize, now); - records += batch.recordCount; - } - this.recordsPerRequestSensor.record(records, now); + for (RecordBatch batch : nodeBatch) { + // register all per-topic metrics at once + String topic = batch.topicPartition.topic(); + maybeRegisterTopicMetrics(topic); + + // per-topic record send rate + String topicRecordsCountName = "topic." + topic + ".records-per-batch"; + Sensor topicRecordCount = Utils.notNull(this.metrics.getSensor(topicRecordsCountName)); + topicRecordCount.record(batch.recordCount); + + // per-topic bytes send rate + String topicByteRateName = "topic." + topic + ".bytes"; + Sensor topicByteRate = Utils.notNull(this.metrics.getSensor(topicByteRateName)); + topicByteRate.record(batch.records.sizeInBytes()); + + // per-topic compression rate + String topicCompressionRateName = "topic." + topic + ".compression-rate"; + Sensor topicCompressionRate = Utils.notNull(this.metrics.getSensor(topicCompressionRateName)); + topicCompressionRate.record(batch.records.compressionRate()); + + // global metrics + this.batchSizeSensor.record(batch.records.sizeInBytes(), now); + this.queueTimeSensor.record(batch.drainedMs - batch.createdMs, now); + this.compressionRateSensor.record(batch.records.compressionRate()); + this.maxRecordSizeSensor.record(batch.maxRecordSize, now); + records += batch.recordCount; } + this.recordsPerRequestSensor.record(records, now); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/Cluster.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index d3299b9..d7ccbcd 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -26,6 +26,7 @@ public final class Cluster { private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; private final Map<String, List<PartitionInfo>> partitionsByTopic; private final Map<Integer, List<PartitionInfo>> partitionsByNode; + private final Map<Integer, Node> nodesById; /** * Create a new cluster with the given nodes and partitions @@ -37,6 +38,10 @@ public final class Cluster { List<Node> copy = new ArrayList<Node>(nodes); Collections.shuffle(copy); this.nodes = Collections.unmodifiableList(copy); + + this.nodesById = new HashMap<Integer, Node>(); + for(Node node: nodes) + this.nodesById.put(node.id(), node); // index the partitions by topic/partition for quick lookup this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size()); @@ -97,6 +102,15 @@ public final class Cluster { public List<Node> nodes() { return this.nodes; } + + /** + * Get the node by the node id (or null if no such node exists) + * @param id The id of the node + * @return The node, or null if no such node exists + */ + public Node nodeById(int id) { + return this.nodesById.get(id); + } /** * Get the current leader for the given topic-partition http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java index b15aa2c..28562f9 100644 --- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java +++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java @@ -69,10 +69,10 @@ public class PartitionInfo { @Override public String toString() { - return String.format("Partition(topic = %s, partition = %d, leader = %d, replicas = %s, isr = %s", + return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s", topic, partition, - leader.id(), + leader == null? "none" : leader.id(), fmtNodeIds(replicas), fmtNodeIds(inSyncReplicas)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 98cb79b..38ce10b 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -21,6 +21,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.kafka.common.utils.Utils; + /** * This class is used for specifying the set of expected configurations, their type, their defaults, their * documentation, and any special validation logic used for checking the correctness of the values the user provides. @@ -292,39 +294,23 @@ public class ConfigDef { this.validStrings = validStrings; } - public static ValidString in(List<String> validStrings) { - return new ValidString(validStrings); + public static ValidString in(String... validStrings) { + return new ValidString(Arrays.asList(validStrings)); } @Override public void ensureValid(String name, Object o) { - String s = (String) o; - if (!validStrings.contains(s)) { - throw new ConfigException(name,o,"String must be one of:" +join(validStrings)); + throw new ConfigException(name,o,"String must be one of: " + Utils.join(validStrings, ", ")); } } public String toString() { - return "[" + join(validStrings) + "]"; + return "[" + Utils.join(validStrings, ", ") + "]"; } - private String join(List<String> list) - { - StringBuilder sb = new StringBuilder(); - boolean first = true; - for (String item : list) - { - if (first) - first = false; - else - sb.append(","); - sb.append(item); - } - return sb.toString(); - } } private static class ConfigKey { http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java b/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java index 7c948b1..a566b90 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.KafkaException; * Any API exception that is part of the public protocol and should be a subclass of this class and be part of this * package. */ -public abstract class ApiException extends KafkaException { +public class ApiException extends KafkaException { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/network/Selectable.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java index b68bbf0..b5f8d83 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.network; @@ -51,13 +47,17 @@ public interface Selectable { public void close(); /** - * Initiate any sends provided, and make progress on any other I/O operations in-flight (connections, - * disconnections, existing sends, and receives) + * Queue the given request for sending in the subsequent {@poll(long)} calls + * @param send The request to send + */ + public void send(NetworkSend send); + + /** + * Do I/O. Reads, writes, connection establishment, etc. * @param timeout The amount of time to block if there is nothing to do - * @param sends The new sends to initiate * @throws IOException */ - public void poll(long timeout, List<NetworkSend> sends) throws IOException; + public void poll(long timeout) throws IOException; /** * The list of sends that completed on the last {@link #poll(long, List) poll()} call. @@ -81,4 +81,26 @@ public interface Selectable { */ public List<Integer> connected(); + /** + * Disable reads from the given connection + * @param id The id for the connection + */ + public void mute(int id); + + /** + * Re-enable reads from the given connection + * @param id The id for the connection + */ + public void unmute(int id); + + /** + * Disable reads from all connections + */ + public void muteAll(); + + /** + * Re-enable reads from all connections + */ + public void unmuteAll(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 74d695b..e18a769 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.common.network; +import java.io.EOFException; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -81,6 +82,7 @@ public class Selector implements Selectable { private final List<NetworkReceive> completedReceives; private final List<Integer> disconnected; private final List<Integer> connected; + private final List<Integer> failedSends; private final Time time; private final SelectorMetrics sensors; private final String metricGrpPrefix; @@ -103,6 +105,7 @@ public class Selector implements Selectable { this.completedReceives = new ArrayList<NetworkReceive>(); this.connected = new ArrayList<Integer>(); this.disconnected = new ArrayList<Integer>(); + this.failedSends = new ArrayList<Integer>(); this.sensors = new SelectorMetrics(metrics); } @@ -179,10 +182,26 @@ public class Selector implements Selectable { } /** + * Queue the given request for sending in the subsequent {@poll(long)} calls + * @param send The request to send + */ + public void send(NetworkSend send) { + SelectionKey key = keyForId(send.destination()); + Transmissions transmissions = transmissions(key); + if (transmissions.hasSend()) + throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); + transmissions.send = send; + try { + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + } catch (CancelledKeyException e) { + close(key); + this.failedSends.add(send.destination()); + } + } + + /** * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing * disconnections, initiating new sends, or making progress on in-progress sends or receives. - * <p> - * The provided network sends will be started. * * When this call is completed the user can check for completed sends, receives, connections or disconnects using * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These @@ -190,29 +209,13 @@ public class Selector implements Selectable { * completed I/O. * * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely. - * @param sends The list of new sends to begin - * * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is * already an in-progress send */ @Override - public void poll(long timeout, List<NetworkSend> sends) throws IOException { + public void poll(long timeout) throws IOException { clear(); - /* register for write interest on any new sends */ - for (NetworkSend send : sends) { - SelectionKey key = keyForId(send.destination()); - Transmissions transmissions = transmissions(key); - if (transmissions.hasSend()) - throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); - transmissions.send = send; - try { - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); - } catch (CancelledKeyException e) { - close(key); - } - } - /* check ready keys */ long startSelect = time.nanoseconds(); int readyKeys = select(timeout); @@ -266,21 +269,34 @@ public class Selector implements Selectable { } /* cancel any defunct sockets */ - if (!key.isValid()) + if (!key.isValid()) { close(key); + this.disconnected.add(transmissions.id); + } } catch (IOException e) { - InetAddress remoteAddress = null; - Socket socket = channel.socket(); - if (socket != null) - remoteAddress = socket.getInetAddress(); - log.warn("Error in I/O with {}", remoteAddress , e); + String desc = socketDescription(channel); + if(e instanceof EOFException) + log.info("Connection {} disconnected", desc); + else + log.warn("Error in I/O with connection to {}", desc, e); close(key); + this.disconnected.add(transmissions.id); } } } long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); } + + private String socketDescription(SocketChannel channel) { + Socket socket = channel.socket(); + if(socket == null) + return "[unconnected socket]"; + else if(socket.getInetAddress() != null) + return socket.getInetAddress().toString(); + else + return socket.getLocalAddress().toString(); + } @Override public List<NetworkSend> completedSends() { @@ -302,6 +318,36 @@ public class Selector implements Selectable { return this.connected; } + @Override + public void mute(int id) { + mute(this.keyForId(id)); + } + + private void mute(SelectionKey key) { + key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); + } + + @Override + public void unmute(int id) { + unmute(this.keyForId(id)); + } + + private void unmute(SelectionKey key) { + key.interestOps(key.interestOps() | SelectionKey.OP_READ); + } + + @Override + public void muteAll() { + for (SelectionKey key : this.keys.values()) + mute(key); + } + + @Override + public void unmuteAll() { + for (SelectionKey key : this.keys.values()) + unmute(key); + } + /** * Clear the results from the prior poll */ @@ -310,6 +356,8 @@ public class Selector implements Selectable { this.completedReceives.clear(); this.connected.clear(); this.disconnected.clear(); + this.disconnected.addAll(this.failedSends); + this.failedSends.clear(); } /** @@ -335,7 +383,6 @@ public class Selector implements Selectable { SocketChannel channel = channel(key); Transmissions trans = transmissions(key); if (trans != null) { - this.disconnected.add(trans.id); this.keys.remove(trans.id); trans.clearReceive(); trans.clearSend(); http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 3316b6a..a8deac4 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -19,36 +19,62 @@ package org.apache.kafka.common.protocol; import java.util.HashMap; import java.util.Map; -import org.apache.kafka.common.errors.*; - +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.LeaderNotAvailableException; +import org.apache.kafka.common.errors.NetworkException; +import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException; +import org.apache.kafka.common.errors.NotEnoughReplicasException; +import org.apache.kafka.common.errors.NotLeaderForPartitionException; +import org.apache.kafka.common.errors.OffsetMetadataTooLarge; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.errors.RecordBatchTooLargeException; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; /** * This class contains all the client-server errors--those errors that must be sent from the server to the client. These * are thus part of the protocol. The names can be changed but the error code cannot. - * + * * Do not add exceptions that occur only on the client or only on the server here. */ public enum Errors { UNKNOWN(-1, new UnknownServerException("The server experienced an unexpected error when processing the request")), NONE(0, null), - OFFSET_OUT_OF_RANGE(1, new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")), - CORRUPT_MESSAGE(2, new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")), - UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")), + OFFSET_OUT_OF_RANGE(1, + new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")), + CORRUPT_MESSAGE(2, + new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")), + UNKNOWN_TOPIC_OR_PARTITION(3, + new UnknownTopicOrPartitionException("This server does not host this topic-partition.")), // TODO: errorCode 4 for InvalidFetchSize - LEADER_NOT_AVAILABLE(5, new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")), - NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")), + LEADER_NOT_AVAILABLE(5, + new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")), + NOT_LEADER_FOR_PARTITION(6, + new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")), REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")), - // TODO: errorCode 8, 9, 11 - MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), + MESSAGE_TOO_LARGE(10, + new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")), - // TODO: errorCode 14, 15, 16 - INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")), - RECORD_LIST_TOO_LARGE(18, new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")), - NOT_ENOUGH_REPLICAS(19, new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")), - NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")); + OFFSET_LOAD_IN_PROGRESS(14, new ApiException("The coordinator is loading offsets and can't process requests.")), + CONSUMER_COORDINATOR_NOT_AVAILABLE(15, new ApiException("The coordinator is not available.")), + NOT_COORDINATOR_FOR_CONSUMER(16, new ApiException("This is not the correct co-ordinator for this consumer.")), + INVALID_TOPIC_EXCEPTION(17, + new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")), + RECORD_LIST_TOO_LARGE(18, + new RecordBatchTooLargeException("The request included message batch larger than the configured segment size on the server.")), + NOT_ENOUGH_REPLICAS(19, + new NotEnoughReplicasException("Messages are rejected since there are fewer in-sync replicas than required.")), + NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, + new NotEnoughReplicasAfterAppendException("Messages are written to the log, but to fewer in-sync replicas than required.")); + private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>(); private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>(); + static { for (Errors error : Errors.values()) { codeToError.put(error.code(), error); @@ -84,8 +110,9 @@ public enum Errors { * Throw the exception corresponding to this error if there is one */ public void maybeThrow() { - if (exception != null) + if (exception != null) { throw this.exception; + } } /**