http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index fa88ac1..f50da82 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -9,185 +9,174 @@
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
-*/
+ */
 package org.apache.kafka.clients.consumer;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.MetricName;
 
 /**
- * A mock of the {@link Consumer} interface you can use for testing code that 
uses Kafka.
- * This class is <i> not threadsafe </i>
+ * A mock of the {@link Consumer} interface you can use for testing code that 
uses Kafka. This class is <i> not
+ * threadsafe </i>
  * <p>
- * The consumer runs in the user thread and multiplexes I/O over TCP 
connections to each of the brokers it
- * needs to communicate with. Failure to close the consumer after use will 
leak these resources.
+ * The consumer runs in the user thread and multiplexes I/O over TCP 
connections to each of the brokers it needs to
+ * communicate with. Failure to close the consumer after use will leak these 
resources.
  */
-public class MockConsumer implements Consumer<byte[], byte[]> {
+public class MockConsumer<K, V> implements Consumer<K, V> {
+
+    private final Map<String, List<PartitionInfo>> partitions;
+    private final SubscriptionState subscriptions;
+    private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private boolean closed;
 
-    private final Set<TopicPartition> subscribedPartitions;
-    private final Set<String> subscribedTopics;
-    private final Map<TopicPartition, Long> committedOffsets; 
-    private final Map<TopicPartition, Long> consumedOffsets;
-    
     public MockConsumer() {
-        subscribedPartitions = new HashSet<TopicPartition>();
-        subscribedTopics = new HashSet<String>();
-        committedOffsets = new HashMap<TopicPartition, Long>();
-        consumedOffsets = new HashMap<TopicPartition, Long>();
+        this.subscriptions = new SubscriptionState();
+        this.partitions = new HashMap<String, List<PartitionInfo>>();
+        this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, 
V>>>();
+        this.closed = false;
     }
     
     @Override
-    public void subscribe(String... topics) {
-        if(subscribedPartitions.size() > 0)
-            throw new IllegalStateException("Subcription to topics and 
partitions is mutually exclusive");
-        for(String topic : topics) {
-            subscribedTopics.add(topic);
-        }
+    public synchronized Set<TopicPartition> subscriptions() {
+        return this.subscriptions.assignedPartitions();
     }
 
     @Override
-    public void subscribe(TopicPartition... partitions) {
-        if(subscribedTopics.size() > 0)
-            throw new IllegalStateException("Subcription to topics and 
partitions is mutually exclusive");
-        for(TopicPartition partition : partitions) {
-            subscribedPartitions.add(partition);
-            consumedOffsets.put(partition, 0L);
-        }
+    public synchronized void subscribe(String... topics) {
+        ensureNotClosed();
+        for (String topic : topics)
+            this.subscriptions.subscribe(topic);
     }
 
-    public void unsubscribe(String... topics) {
-        // throw an exception if the topic was never subscribed to
-        for(String topic:topics) {
-            if(!subscribedTopics.contains(topic))
-                throw new IllegalStateException("Topic " + topic + " was never 
subscribed to. subscribe(" + topic + ") should be called prior" +
-                        " to unsubscribe(" + topic + ")");
-            subscribedTopics.remove(topic);
-        }
+    @Override
+    public synchronized void subscribe(TopicPartition... partitions) {
+        ensureNotClosed();
+        for (TopicPartition partition : partitions)
+            this.subscriptions.subscribe(partition);
     }
 
-    public void unsubscribe(TopicPartition... partitions) {
-        // throw an exception if the partition was never subscribed to
-        for(TopicPartition partition:partitions) {
-            if(!subscribedPartitions.contains(partition))
-                throw new IllegalStateException("Partition " + partition + " 
was never subscribed to. subscribe(new TopicPartition(" + 
-                                                 partition.topic() + "," + 
partition.partition() + ") should be called prior" +
-                                                " to unsubscribe(new 
TopicPartition(" + partition.topic() + "," + partition.partition() + ")");
-            subscribedPartitions.remove(partition);                    
-            committedOffsets.remove(partition);
-            consumedOffsets.remove(partition);
-        }
+    public synchronized void unsubscribe(String... topics) {
+        ensureNotClosed();
+        for (String topic : topics)
+            this.subscriptions.unsubscribe(topic);
+    }
+
+    public synchronized void unsubscribe(TopicPartition... partitions) {
+        ensureNotClosed();
+        for (TopicPartition partition : partitions)
+            this.subscriptions.unsubscribe(partition);
     }
 
     @Override
-    public Map<String, ConsumerRecords<byte[], byte[]>> poll(long timeout) {
-        // hand out one dummy record, 1 per topic
-        Map<String, List<ConsumerRecord>> records = new HashMap<String, 
List<ConsumerRecord>>();
-        Map<String, ConsumerRecords<byte[], byte[]>> recordMetadata = new 
HashMap<String, ConsumerRecords<byte[], byte[]>>();
-        for(TopicPartition partition : subscribedPartitions) {
-            // get the last consumed offset
-            long messageSequence = consumedOffsets.get(partition);
-            ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
-            ObjectOutputStream outputStream;
-            try {
-                outputStream = new ObjectOutputStream(byteStream);
-                outputStream.writeLong(messageSequence++);
-                outputStream.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-            List<ConsumerRecord> recordsForTopic = 
records.get(partition.topic());
-            if(recordsForTopic == null) {
-                recordsForTopic = new ArrayList<ConsumerRecord>();
-                records.put(partition.topic(), recordsForTopic);
-            }
-            recordsForTopic.add(new ConsumerRecord(partition.topic(), 
partition.partition(), null, byteStream.toByteArray(), messageSequence));
-            consumedOffsets.put(partition, messageSequence);
+    public synchronized ConsumerRecords<K, V> poll(long timeout) {
+        ensureNotClosed();
+        // update the consumed offset
+        for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : 
this.records.entrySet()) {
+            List<ConsumerRecord<K, V>> recs = entry.getValue();
+            if (!recs.isEmpty())
+                this.subscriptions.consumed(entry.getKey(), 
recs.get(recs.size() - 1).offset());
         }
-        for(Entry<String, List<ConsumerRecord>> recordsPerTopic : 
records.entrySet()) {
-            Map<Integer, List<ConsumerRecord>> recordsPerPartition = new 
HashMap<Integer, List<ConsumerRecord>>();
-            for(ConsumerRecord record : recordsPerTopic.getValue()) {
-                List<ConsumerRecord> recordsForThisPartition = 
recordsPerPartition.get(record.partition());
-                if(recordsForThisPartition == null) {
-                    recordsForThisPartition = new ArrayList<ConsumerRecord>();
-                    recordsPerPartition.put(record.partition(), 
recordsForThisPartition);
-                }
-                recordsForThisPartition.add(record);
-            }
-            recordMetadata.put(recordsPerTopic.getKey(), new 
ConsumerRecords(recordsPerTopic.getKey(), recordsPerPartition));
+
+        ConsumerRecords<K, V> copy = new ConsumerRecords<K, V>(this.records);
+        this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, 
V>>>();
+        return copy;
+    }
+
+    public synchronized void addRecord(ConsumerRecord<K, V> record) {
+        ensureNotClosed();
+        TopicPartition tp = new TopicPartition(record.topic(), 
record.partition());
+        this.subscriptions.assignedPartitions().add(tp);
+        List<ConsumerRecord<K, V>> recs = this.records.get(tp);
+        if (recs == null) {
+            recs = new ArrayList<ConsumerRecord<K, V>>();
+            this.records.put(tp, recs);
         }
-        return recordMetadata;
+        recs.add(record);
     }
 
     @Override
-    public OffsetMetadata commit(Map<TopicPartition, Long> offsets, boolean 
sync) {
-        if(!sync)
-            return null;
-        for(Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
-            committedOffsets.put(partitionOffset.getKey(), 
partitionOffset.getValue());            
-        }        
-        return new OffsetMetadata(committedOffsets, null);
+    public synchronized void commit(Map<TopicPartition, Long> offsets, 
CommitType commitType) {
+        ensureNotClosed();
+        for (Entry<TopicPartition, Long> entry : offsets.entrySet())
+            subscriptions.committed(entry.getKey(), entry.getValue());
     }
 
     @Override
-    public OffsetMetadata commit(boolean sync) {
-        if(!sync)
-            return null;
-        return commit(consumedOffsets, sync);
+    public synchronized void commit(CommitType commitType) {
+        ensureNotClosed();
+        commit(this.subscriptions.allConsumed(), commitType);
     }
 
     @Override
-    public void seek(Map<TopicPartition, Long> offsets) {
-        // change the fetch offsets
-        for(Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
-            consumedOffsets.put(partitionOffset.getKey(), 
partitionOffset.getValue());            
-        }
+    public synchronized void seek(TopicPartition partition, long offset) {
+        ensureNotClosed();
+        subscriptions.seek(partition, offset);
     }
 
     @Override
-    public Map<TopicPartition, Long> committed(Collection<TopicPartition> 
partitions) {
-        Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, 
Long>();
-        for(TopicPartition partition : partitions) {
-            offsets.put(new TopicPartition(partition.topic(), 
partition.partition()), committedOffsets.get(partition));
-        }
-        return offsets;
+    public synchronized long committed(TopicPartition partition) {
+        ensureNotClosed();
+        return subscriptions.committed(partition);
     }
 
     @Override
-    public Map<TopicPartition, Long> position(Collection<TopicPartition> 
partitions) {
-        Map<TopicPartition, Long> positions = new HashMap<TopicPartition, 
Long>();
-        for(TopicPartition partition : partitions) {
-            positions.put(partition, consumedOffsets.get(partition));
-        }
-        return positions;
+    public synchronized long position(TopicPartition partition) {
+        ensureNotClosed();
+        return subscriptions.consumed(partition);
+    }
+
+    @Override
+    public synchronized void seekToBeginning(TopicPartition... partitions) {
+        ensureNotClosed();
+        throw new UnsupportedOperationException();
     }
 
     @Override
-    public Map<TopicPartition, Long> offsetsBeforeTime(long timestamp,
-            Collection<TopicPartition> partitions) {
+    public synchronized void seekToEnd(TopicPartition... partitions) {
+        ensureNotClosed();
         throw new UnsupportedOperationException();
     }
 
     @Override
     public Map<MetricName, ? extends Metric> metrics() {
-        return null;
+        ensureNotClosed();
+        return Collections.emptyMap();
     }
 
     @Override
-    public void close() {
-       // unsubscribe from all partitions
-        TopicPartition[] allPartitions = new 
TopicPartition[subscribedPartitions.size()];
-        unsubscribe(subscribedPartitions.toArray(allPartitions));
+    public synchronized List<PartitionInfo> partitionsFor(String topic) {
+        ensureNotClosed();
+        List<PartitionInfo> parts = this.partitions.get(topic);
+        if (parts == null)
+            return Collections.emptyList();
+        else
+            return parts;
+    }
+
+    public synchronized void updatePartitions(String topic, 
List<PartitionInfo> partitions) {
+        ensureNotClosed();
+        this.partitions.put(topic, partitions);
+    }
+
+    @Override
+    public synchronized void close() {
+        ensureNotClosed();
+        this.closed = true;
+    }
+
+    private void ensureNotClosed() {
+        if (this.closed)
+            throw new IllegalStateException("This consumer has already been 
closed.");
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
new file mode 100644
index 0000000..a21f97b
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * Indicates that there is no stored offset and no defined offset reset policy
+ */
+public class NoOffsetForPartitionException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public NoOffsetForPartitionException(String message) {
+        super(message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java
deleted file mode 100644
index ea423ad..0000000
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer;
-
-import java.util.Map;
-
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * The metadata for an offset commit that has been acknowledged by the server
- */
-public final class OffsetMetadata {
-
-    private final Map<TopicPartition, Long> offsets;
-    private final Map<TopicPartition, RuntimeException> errors;
-    
-    public OffsetMetadata(Map<TopicPartition, Long> offsets, 
Map<TopicPartition, RuntimeException> errors) {
-        super();
-        this.offsets = offsets;
-        this.errors = errors;
-    }
-
-    public OffsetMetadata(Map<TopicPartition, Long> offsets) {
-        this(offsets, null);
-    }
-
-    /**
-     * The offset of the record in the topic/partition.
-     */
-    public long offset(TopicPartition partition) {
-        if(this.errors != null)
-            throw errors.get(partition);
-        return offsets.get(partition);
-    }
-
-    /**
-     * @return The exception corresponding to the error code returned by the 
server
-     */
-    public Exception error(TopicPartition partition) {
-        if(errors != null)
-            return errors.get(partition);
-        else
-            return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
new file mode 100644
index 0000000..d9483ec
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -0,0 +1,47 @@
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * A helper class for managing the heartbeat to the co-ordinator
+ */
+public final class Heartbeat {
+    
+    /* The number of heartbeats to attempt to complete per session timeout 
interval.
+     * so, e.g., with a session timeout of 3 seconds we would attempt a 
heartbeat
+     * once per second.
+     */
+    private final static int HEARTBEATS_PER_SESSION_INTERVAL = 3;
+
+    private final long timeout;
+    private long lastHeartbeatSend;
+    private long lastHeartbeatResponse;
+
+    public Heartbeat(long timeout, long now) {
+        this.timeout = timeout;
+        this.lastHeartbeatSend = now;
+        this.lastHeartbeatResponse = now;
+    }
+
+    public void sentHeartbeat(long now) {
+        this.lastHeartbeatSend = now;
+    }
+
+    public void receivedResponse(long now) {
+        this.lastHeartbeatResponse = now;
+    }
+
+    public void markDead() {
+        this.lastHeartbeatResponse = -1;
+    }
+
+    public boolean isAlive(long now) {
+        return now - lastHeartbeatResponse <= timeout;
+    }
+
+    public boolean shouldHeartbeat(long now) {
+        return now - lastHeartbeatSend > (1.0 / 
HEARTBEATS_PER_SESSION_INTERVAL) * this.timeout;
+    }
+    
+    public long lastHeartbeatSend() {
+        return this.lastHeartbeatSend;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
new file mode 100644
index 0000000..7e57a39
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.Collection;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback;
+import org.apache.kafka.common.TopicPartition;
+
+public class NoOpConsumerRebalanceCallback implements 
ConsumerRebalanceCallback {
+
+    @Override
+    public void onPartitionsAssigned(Consumer<?,?> consumer, 
Collection<TopicPartition> partitions) {}
+
+    @Override
+    public void onPartitionsRevoked(Consumer<?,?> consumer, 
Collection<TopicPartition> partitions) {}
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
new file mode 100644
index 0000000..71ce20d
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -0,0 +1,166 @@
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * A class for tracking the topics, partitions, and offsets for the consumer
+ */
+public class SubscriptionState {
+
+    /* the list of topics the user has requested */
+    private final Set<String> subscribedTopics;
+
+    /* the list of partitions the user has requested */
+    private final Set<TopicPartition> subscribedPartitions;
+
+    /* the list of partitions currently assigned */
+    private final Set<TopicPartition> assignedPartitions;
+
+    /* the offset exposed to the user */
+    private final Map<TopicPartition, Long> consumed;
+
+    /* the current point we have fetched up to */
+    private final Map<TopicPartition, Long> fetched;
+
+    /* the last committed offset for each partition */
+    private final Map<TopicPartition, Long> committed;
+
+    /* do we need to request a partition assignment from the co-ordinator? */
+    private boolean needsPartitionAssignment;
+
+    public SubscriptionState() {
+        this.subscribedTopics = new HashSet<String>();
+        this.subscribedPartitions = new HashSet<TopicPartition>();
+        this.assignedPartitions = new HashSet<TopicPartition>();
+        this.consumed = new HashMap<TopicPartition, Long>();
+        this.fetched = new HashMap<TopicPartition, Long>();
+        this.committed = new HashMap<TopicPartition, Long>();
+        this.needsPartitionAssignment = false;
+    }
+
+    public void subscribe(String topic) {
+        if (this.subscribedPartitions.size() > 0)
+            throw new IllegalStateException("Subcription to topics and 
partitions are mutually exclusive");
+        if (!this.subscribedTopics.contains(topic)) {
+            this.subscribedTopics.add(topic);
+            this.needsPartitionAssignment = true;
+        }
+    }
+
+    public void unsubscribe(String topic) {
+        if (!this.subscribedTopics.contains(topic))
+            throw new IllegalStateException("Topic " + topic + " was never 
subscribed to.");
+        this.subscribedTopics.remove(topic);
+        this.needsPartitionAssignment = true;
+        for(TopicPartition tp: assignedPartitions())
+            if(topic.equals(tp.topic()))
+                clearPartition(tp);
+    }
+
+    public void subscribe(TopicPartition tp) {
+        if (this.subscribedTopics.size() > 0)
+            throw new IllegalStateException("Subcription to topics and 
partitions are mutually exclusive");
+        this.subscribedPartitions.add(tp);
+        this.assignedPartitions.add(tp);
+    }
+
+    public void unsubscribe(TopicPartition partition) {
+        if (!subscribedPartitions.contains(partition))
+            throw new IllegalStateException("Partition " + partition + " was 
never subscribed to.");
+        subscribedPartitions.remove(partition);
+        clearPartition(partition);
+    }
+    
+    private void clearPartition(TopicPartition tp) {
+        this.assignedPartitions.remove(tp);
+        this.committed.remove(tp);
+        this.fetched.remove(tp);
+        this.consumed.remove(tp);
+    }
+
+    public void clearAssignment() {
+        this.assignedPartitions.clear();
+        this.committed.clear();
+        this.fetched.clear();
+        this.needsPartitionAssignment = !subscribedTopics().isEmpty();
+    }
+
+    public Set<String> subscribedTopics() {
+        return this.subscribedTopics;
+    }
+
+    public Long fetched(TopicPartition tp) {
+        return this.fetched.get(tp);
+    }
+
+    public void fetched(TopicPartition tp, long offset) {
+        if (!this.assignedPartitions.contains(tp))
+            throw new IllegalArgumentException("Can't change the fetch 
position for a partition you are not currently subscribed to.");
+        this.fetched.put(tp, offset);
+    }
+
+    public void committed(TopicPartition tp, long offset) {
+        this.committed.put(tp, offset);
+    }
+
+    public Long committed(TopicPartition tp) {
+        return this.committed.get(tp);
+    }
+    
+    public void seek(TopicPartition tp, long offset) {
+        fetched(tp, offset);
+        consumed(tp, offset);
+    }
+
+    public Set<TopicPartition> assignedPartitions() {
+        return this.assignedPartitions;
+    }
+
+    public boolean partitionsAutoAssigned() {
+        return !this.subscribedTopics.isEmpty();
+    }
+
+    public void consumed(TopicPartition tp, long offset) {
+        if (!this.assignedPartitions.contains(tp))
+            throw new IllegalArgumentException("Can't change the consumed 
position for a partition you are not currently subscribed to.");
+        this.consumed.put(tp, offset);
+    }
+
+    public Long consumed(TopicPartition partition) {
+        return this.consumed.get(partition);
+    }
+
+    public Map<TopicPartition, Long> allConsumed() {
+        return this.consumed;
+    }
+
+    public boolean hasAllFetchPositions() {
+        return this.fetched.size() >= this.assignedPartitions.size();
+    }
+
+    public Set<TopicPartition> missingFetchPositions() {
+        Set<TopicPartition> copy = new 
HashSet<TopicPartition>(this.assignedPartitions);
+        copy.removeAll(this.fetched.keySet());
+        return copy;
+    }
+
+    public boolean needsPartitionAssignment() {
+        return this.needsPartitionAssignment;
+    }
+
+    public void changePartitionAssignment(List<TopicPartition> assignments) {
+        for (TopicPartition tp : assignments)
+            if (!this.subscribedTopics.contains(tp.topic()))
+                throw new IllegalArgumentException("Assigned partition " + tp 
+ " for non-subscribed topic.");
+        this.clearAssignment();
+        this.assignedPartitions.addAll(assignments);
+        this.needsPartitionAssignment = false;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index fc71710..ebc4c53 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -329,8 +329,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
                         " to class " + 
producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() 
+
                         " specified in value.serializer");
             }
-            ProducerRecord<byte[], byte[]> serializedRecord = new 
ProducerRecord<byte[], byte[]>(record.topic(), record.partition(), 
serializedKey, serializedValue);
-            int partition = partitioner.partition(serializedRecord, 
metadata.fetch());
+            int partition = partitioner.partition(record.topic(), 
serializedKey, record.partition(), metadata.fetch());
             int serializedSize = Records.LOG_OVERHEAD + 
Record.recordSize(serializedKey, serializedValue);
             ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 904976f..84530f2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -100,7 +100,7 @@ public class MockProducer implements Producer<byte[], 
byte[]> {
     public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], 
byte[]> record, Callback callback) {
         int partition = 0;
         if (this.cluster.partitionsForTopic(record.topic()) != null)
-            partition = partitioner.partition(record, this.cluster);
+            partition = partitioner.partition(record.topic(), record.key(), 
record.partition(), this.cluster);
         ProduceRequestResult result = new ProduceRequestResult();
         FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
         TopicPartition topicPartition = new TopicPartition(record.topic(), 
partition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 8b3e565..9a43d66 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -16,9 +16,9 @@ import static 
org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
-import java.util.Arrays;
 import java.util.Map;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -38,23 +38,17 @@ public class ProducerConfig extends AbstractConfig {
     private static final ConfigDef config;
 
     /** <code>bootstrap.servers</code> */
-    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
-    private static final String BOOSTRAP_SERVERS_DOC = "A list of host/port 
pairs to use for establishing the initial connection to the Kafka cluster. Data 
will be load " + "balanced over all servers irrespective of which servers are 
specified here for bootstrapping&mdash;this list only "
-                                                       + "impacts the initial 
hosts used to discover the full set of servers. This list should be in the form 
"
-                                                       + 
"<code>host1:port1,host2:port2,...</code>. Since these servers are just used 
for the initial connection to "
-                                                       + "discover the full 
cluster membership (which may change dynamically), this list need not contain 
the full set of "
-                                                       + "servers (you may 
want more than one, though, in case a server is down). If no server in this 
list is available sending "
-                                                       + "data will fail until 
on becomes available.";
+    public static final String BOOTSTRAP_SERVERS_CONFIG = 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
     /** <code>metadata.fetch.timeout.ms</code> */
     public static final String METADATA_FETCH_TIMEOUT_CONFIG = 
"metadata.fetch.timeout.ms";
-    private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time 
data is sent to a topic we must fetch metadata about that topic to know which 
servers host the " + "topic's partitions. This configuration controls the 
maximum amount of time we will block waiting for the metadata "
+    private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time 
data is sent to a topic we must fetch metadata about that topic to know which 
servers host the topic's partitions. This "
                                                              + "fetch to 
succeed before throwing an exception back to the client.";
 
     /** <code>metadata.max.age.ms</code> */
-    public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
-    private static final String METADATA_MAX_AGE_DOC = "The period of time in 
milliseconds after which we force a refresh of metadata even if we haven't seen 
any " + " partition leadership changes to proactively discover any new brokers 
or partitions.";
-
+    public static final String METADATA_MAX_AGE_CONFIG = 
CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
+    private static final String METADATA_MAX_AGE_DOC = 
CommonClientConfigs.METADATA_MAX_AGE_DOC;
+            
     /** <code>batch.size</code> */
     public static final String BATCH_SIZE_CONFIG = "batch.size";
     private static final String BATCH_SIZE_DOC = "The producer will attempt to 
batch records together into fewer requests whenever multiple records are being 
sent" + " to the same partition. This helps performance on both the client and 
the server. This configuration controls the "
@@ -113,17 +107,13 @@ public class ProducerConfig extends AbstractConfig {
                                                 + "for example, would have the 
effect of reducing the number of requests sent but would add up to 5ms of 
latency to records sent in the absense of load.";
 
     /** <code>client.id</code> */
-    public static final String CLIENT_ID_CONFIG = "client.id";
-    private static final String CLIENT_ID_DOC = "The id string to pass to the 
server when making requests. The purpose of this is to be able to track the 
source " + "of requests beyond just ip/port by allowing a logical application 
name to be included with the request. The "
-                                                + "application can set any 
string it wants as this has no functional purpose other than in logging and 
metrics.";
+    public static final String CLIENT_ID_CONFIG = 
CommonClientConfigs.CLIENT_ID_CONFIG;
 
     /** <code>send.buffer.bytes</code> */
-    public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
-    private static final String SEND_BUFFER_DOC = "The size of the TCP send 
buffer to use when sending data";
+    public static final String SEND_BUFFER_CONFIG = 
CommonClientConfigs.SEND_BUFFER_CONFIG;
 
     /** <code>receive.buffer.bytes</code> */
-    public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
-    private static final String RECEIVE_BUFFER_DOC = "The size of the TCP 
receive buffer to use when reading data";
+    public static final String RECEIVE_BUFFER_CONFIG = 
CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
 
     /** <code>max.request.size</code> */
     public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
@@ -131,8 +121,7 @@ public class ProducerConfig extends AbstractConfig {
                                                        + "batches the producer 
will send in a single request to avoid sending huge requests.";
 
     /** <code>reconnect.backoff.ms</code> */
-    public static final String RECONNECT_BACKOFF_MS_CONFIG = 
"reconnect.backoff.ms";
-    private static final String RECONNECT_BACKOFF_MS_DOC = "The amount of time 
to wait before attempting to reconnect to a given host when a connection 
fails." + " This avoids a scenario where the client repeatedly attempts to 
connect to a host in a tight loop.";
+    public static final String RECONNECT_BACKOFF_MS_CONFIG = 
CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
 
     /** <code>block.on.buffer.full</code> */
     public static final String BLOCK_ON_BUFFER_FULL_CONFIG = 
"block.on.buffer.full";
@@ -147,8 +136,7 @@ public class ProducerConfig extends AbstractConfig {
                                               + "may appear first.";
 
     /** <code>retry.backoff.ms</code> */
-    public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
-    private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to 
wait before attempting to retry a failed produce request to a given topic 
partition." + " This avoids repeated sending-and-failing in a tight loop.";
+    public static final String RETRY_BACKOFF_MS_CONFIG = 
CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
 
     /** <code>compression.type</code> */
     public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
@@ -156,17 +144,13 @@ public class ProducerConfig extends AbstractConfig {
                                                        + "Compression is of 
full batches of data, so the efficacy of batching will also impact the 
compression ratio (more batching means better compression).";
 
     /** <code>metrics.sample.window.ms</code> */
-    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = 
"metrics.sample.window.ms";
-    private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics 
system maintains a configurable number of samples over a fixed window size. 
This configuration " + "controls the size of the window. For example we might 
maintain two samples each measured over a 30 second period. "
-                                                               + "When a 
window expires we erase and overwrite the oldest window.";
+    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = 
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
 
     /** <code>metrics.num.samples</code> */
-    public static final String METRICS_NUM_SAMPLES_CONFIG = 
"metrics.num.samples";
-    private static final String METRICS_NUM_SAMPLES_DOC = "The number of 
samples maintained to compute metrics.";
+    public static final String METRICS_NUM_SAMPLES_CONFIG = 
CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
 
     /** <code>metric.reporters</code> */
-    public static final String METRIC_REPORTER_CLASSES_CONFIG = 
"metric.reporters";
-    private static final String METRIC_REPORTER_CLASSES_DOC = "A list of 
classes to use as metrics reporters. Implementing the 
<code>MetricReporter</code> interface allows " + "plugging in classes that will 
be notified of new metric creation. The JmxReporter is always included to 
register JMX statistics.";
+    public static final String METRIC_REPORTER_CLASSES_CONFIG = 
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
 
     /** <code>max.in.flight.requests.per.connection</code> */
     public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 
"max.in.flight.requests.per.connection";
@@ -183,22 +167,22 @@ public class ProducerConfig extends AbstractConfig {
     private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class 
for value that implements the <code>Serializer</code> interface.";
 
     static {
-        config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, 
Importance.HIGH, BOOSTRAP_SERVERS_DOC)
+        config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, 
Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 
1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
                                 .define(RETRIES_CONFIG, Type.INT, 0, 
between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
                                 .define(ACKS_CONFIG,
                                         Type.STRING,
                                         "1",
-                                        in(Arrays.asList("all", "-1", "0", 
"1")),
+                                        in("all","-1", "0", "1"),
                                         Importance.HIGH,
                                         ACKS_DOC)
                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, 
"none", Importance.HIGH, COMPRESSION_TYPE_DOC)
                                 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, 
atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
                                 .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, 
atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)
                                 .define(LINGER_MS_CONFIG, Type.LONG, 0, 
atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
-                                .define(CLIENT_ID_CONFIG, Type.STRING, "", 
Importance.MEDIUM, CLIENT_ID_DOC)
-                                .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 
1024, atLeast(0), Importance.MEDIUM, SEND_BUFFER_DOC)
-                                .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 
1024, atLeast(0), Importance.MEDIUM, RECEIVE_BUFFER_DOC)
+                                .define(CLIENT_ID_CONFIG, Type.STRING, "", 
Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC)
+                                .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 
1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC)
+                                .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 
1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC)
                                 .define(MAX_REQUEST_SIZE_CONFIG,
                                         Type.INT,
                                         1 * 1024 * 1024,
@@ -206,9 +190,9 @@ public class ProducerConfig extends AbstractConfig {
                                         Importance.MEDIUM,
                                         MAX_REQUEST_SIZE_DOC)
                                 .define(BLOCK_ON_BUFFER_FULL_CONFIG, 
Type.BOOLEAN, true, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC)
-                                .define(RECONNECT_BACKOFF_MS_CONFIG, 
Type.LONG, 10L, atLeast(0L), Importance.LOW, RECONNECT_BACKOFF_MS_DOC)
-                                .define(METRIC_REPORTER_CLASSES_CONFIG, 
Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
-                                .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 
100L, atLeast(0L), Importance.LOW, RETRY_BACKOFF_MS_DOC)
+                                .define(RECONNECT_BACKOFF_MS_CONFIG, 
Type.LONG, 50L, atLeast(0L), Importance.LOW, 
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
+                                .define(METRIC_REPORTER_CLASSES_CONFIG, 
Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                                .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 
100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
                                 .define(METADATA_FETCH_TIMEOUT_CONFIG,
                                         Type.LONG,
                                         60 * 1000,
@@ -221,8 +205,8 @@ public class ProducerConfig extends AbstractConfig {
                                         30000,
                                         atLeast(0),
                                         Importance.LOW,
-                                        METRICS_SAMPLE_WINDOW_MS_DOC)
-                                .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 
2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
+                                        
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+                                .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 
2, atLeast(1), Importance.LOW, CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
                                 .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
                                         Type.INT,
                                         5,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index dcf4658..3aff624 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -78,9 +78,9 @@ public final class Metadata {
     }
 
     /**
-     * The next time to update the cluster info is the maximum of the time the 
current info will expire
-     * and the time the current info can be updated (i.e. backoff time has 
elapsed); If an update has
-     * been request then the expiry time is now
+     * The next time to update the cluster info is the maximum of the time the 
current info will expire and the time the
+     * current info can be updated (i.e. backoff time has elapsed); If an 
update has been request then the expiry time
+     * is now
      */
     public synchronized long timeToNextUpdate(long nowMs) {
         long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + 
this.metadataExpireMs - nowMs, 0);
@@ -120,6 +120,15 @@ public final class Metadata {
     }
 
     /**
+     * Add one or more topics to maintain metadata for
+     */
+    public synchronized void addTopics(String... topics) {
+        for (String topic : topics)
+            this.topics.add(topic);
+        requestUpdate();
+    }
+
+    /**
      * Get the list of topics we are currently maintaining metadata for
      */
     public synchronized Set<String> topics() {
@@ -137,6 +146,13 @@ public final class Metadata {
         notifyAll();
         log.debug("Updated cluster metadata version {} to {}", this.version, 
this.cluster);
     }
+    
+    /**
+     * @return The current metadata version
+     */
+    public synchronized int version() {
+        return this.version;
+    }
 
     /**
      * The last time metadata was updated.

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
index 483899d..8112e6d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
@@ -20,7 +20,6 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.utils.Utils;
@@ -40,32 +39,34 @@ public class Partitioner {
     /**
      * Compute the partition for the given record.
      * 
-     * @param record The record being sent
+     * @param topic The topic name
+     * @param key The key to partition on (or null if no key)
+     * @param partition The partition to use (or null if none)
      * @param cluster The current cluster metadata
      */
-    public int partition(ProducerRecord<byte[], byte[]> record, Cluster 
cluster) {
-        List<PartitionInfo> partitions = 
cluster.partitionsForTopic(record.topic());
+    public int partition(String topic, byte[] key, Integer partition, Cluster 
cluster) {
+        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
         int numPartitions = partitions.size();
-        if (record.partition() != null) {
+        if (partition != null) {
             // they have given us a partition, use it
-            if (record.partition() < 0 || record.partition() >= numPartitions)
-                throw new IllegalArgumentException("Invalid partition given 
with record: " + record.partition()
+            if (partition < 0 || partition >= numPartitions)
+                throw new IllegalArgumentException("Invalid partition given 
with record: " + partition
                                                    + " is not in the range 
[0..."
                                                    + numPartitions
                                                    + "].");
-            return record.partition();
-        } else if (record.key() == null) {
+            return partition;
+        } else if (key == null) {
             // choose the next available node in a round-robin fashion
             for (int i = 0; i < numPartitions; i++) {
-                int partition = Utils.abs(counter.getAndIncrement()) % 
numPartitions;
-                if (partitions.get(partition).leader() != null)
-                    return partition;
+                int part = Utils.abs(counter.getAndIncrement()) % 
numPartitions;
+                if (partitions.get(part).leader() != null)
+                    return part;
             }
             // no partitions are available, give a non-available partition
             return Utils.abs(counter.getAndIncrement()) % numPartitions;
         } else {
             // hash the key to choose a partition
-            return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
+            return Utils.abs(Utils.murmur2(key)) % numPartitions;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index ccc03d8..8726809 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -146,7 +147,8 @@ public class Sender implements Runnable {
     /**
      * Run a single iteration of sending
      * 
-     * @param now The current POSIX time in milliseconds
+     * @param now
+     *            The current POSIX time in milliseconds
      */
     public void run(long now) {
         Cluster cluster = metadata.fetch();
@@ -169,9 +171,12 @@ public class Sender implements Runnable {
         }
 
         // create produce requests
-        Map<Integer, List<RecordBatch>> batches = 
this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
+        Map<Integer, List<RecordBatch>> batches = 
this.accumulator.drain(cluster,
+                                                                         
result.readyNodes,
+                                                                         
this.maxRequestSize,
+                                                                         now);
+        sensors.updateProduceRequestMetrics(batches);
         List<ClientRequest> requests = createProduceRequests(batches, now);
-        sensors.updateProduceRequestMetrics(requests);
 
         // If we have any nodes that are ready to send + have sendable data, 
poll with 0 timeout so this can immediately
         // loop and try sending more data. Otherwise, the timeout is 
determined by nodes that have partitions with data
@@ -183,18 +188,14 @@ public class Sender implements Runnable {
             log.trace("Created {} produce requests: {}", requests.size(), 
requests);
             pollTimeout = 0;
         }
+        for (ClientRequest request : requests)
+            client.send(request);
 
         // if some partitions are already ready to be sent, the select time 
would be 0;
         // otherwise if some partition already has some data accumulated but 
not ready yet,
         // the select time will be the time difference between now and its 
linger expiry time;
         // otherwise the select time will be the time difference between now 
and the metadata expiry time;
-        List<ClientResponse> responses = this.client.poll(requests, 
pollTimeout, now);
-        for (ClientResponse response : responses) {
-            if (response.wasDisconnected())
-                handleDisconnect(response, now);
-            else
-                handleResponse(response, now);
-        }
+        this.client.poll(pollTimeout, now);
     }
 
     /**
@@ -206,45 +207,44 @@ public class Sender implements Runnable {
         this.wakeup();
     }
 
-    private void handleDisconnect(ClientResponse response, long now) {
-        log.trace("Cancelled request {} due to node {} being disconnected", 
response, response.request().request().destination());
-        int correlation = 
response.request().request().header().correlationId();
-        @SuppressWarnings("unchecked")
-        Map<TopicPartition, RecordBatch> responseBatches = 
(Map<TopicPartition, RecordBatch>) response.request().attachment();
-        for (RecordBatch batch : responseBatches.values())
-            completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, 
now);
-    }
-
     /**
      * Handle a produce response
      */
-    private void handleResponse(ClientResponse response, long now) {
+    private void handleProduceResponse(ClientResponse response, 
Map<TopicPartition, RecordBatch> batches, long now) {
         int correlationId = 
response.request().request().header().correlationId();
-        log.trace("Received produce response from node {} with correlation id 
{}",
-                  response.request().request().destination(),
-                  correlationId);
-        @SuppressWarnings("unchecked")
-        Map<TopicPartition, RecordBatch> batches = (Map<TopicPartition, 
RecordBatch>) response.request().attachment();
-        // if we have a response, parse it
-        if (response.hasResponse()) {
-            ProduceResponse produceResponse = new 
ProduceResponse(response.responseBody());
-            for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> 
entry : produceResponse.responses().entrySet()) {
-                TopicPartition tp = entry.getKey();
-                ProduceResponse.PartitionResponse partResp = entry.getValue();
-                Errors error = Errors.forCode(partResp.errorCode);
-                RecordBatch batch = batches.get(tp);
-                completeBatch(batch, error, partResp.baseOffset, 
correlationId, now);
-            }
-            
this.sensors.recordLatency(response.request().request().destination(), 
response.requestLatencyMs());
-        } else {
-            // this is the acks = 0 case, just complete all requests
+        if (response.wasDisconnected()) {
+            log.trace("Cancelled request {} due to node {} being 
disconnected", response, response.request()
+                                                                               
                   .request()
+                                                                               
                   .destination());
             for (RecordBatch batch : batches.values())
-                completeBatch(batch, Errors.NONE, -1L, correlationId, now);
+                completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, 
correlationId, now);
+        } else {
+            log.trace("Received produce response from node {} with correlation 
id {}",
+                      response.request().request().destination(),
+                      correlationId);
+            // if we have a response, parse it
+            if (response.hasResponse()) {
+                ProduceResponse produceResponse = new 
ProduceResponse(response.responseBody());
+                for (Map.Entry<TopicPartition, 
ProduceResponse.PartitionResponse> entry : produceResponse.responses()
+                                                                               
                          .entrySet()) {
+                    TopicPartition tp = entry.getKey();
+                    ProduceResponse.PartitionResponse partResp = 
entry.getValue();
+                    Errors error = Errors.forCode(partResp.errorCode);
+                    RecordBatch batch = batches.get(tp);
+                    completeBatch(batch, error, partResp.baseOffset, 
correlationId, now);
+                }
+                
this.sensors.recordLatency(response.request().request().destination(), 
response.requestLatencyMs());
+            } else {
+                // this is the acks = 0 case, just complete all requests
+                for (RecordBatch batch : batches.values())
+                    completeBatch(batch, Errors.NONE, -1L, correlationId, now);
+            }
         }
     }
 
     /**
      * Complete or retry the given batch of records.
+     * 
      * @param batch The record batch
      * @param error The error (or null if none)
      * @param baseOffset The base offset assigned to the records if successful
@@ -294,7 +294,7 @@ public class Sender implements Runnable {
      */
     private ClientRequest produceRequest(long now, int destination, short 
acks, int timeout, List<RecordBatch> batches) {
         Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new 
HashMap<TopicPartition, ByteBuffer>(batches.size());
-        Map<TopicPartition, RecordBatch> recordsByPartition = new 
HashMap<TopicPartition, RecordBatch>(batches.size());
+        final Map<TopicPartition, RecordBatch> recordsByPartition = new 
HashMap<TopicPartition, RecordBatch>(batches.size());
         for (RecordBatch batch : batches) {
             TopicPartition tp = batch.topicPartition;
             ByteBuffer recordsBuffer = batch.records.buffer();
@@ -303,8 +303,15 @@ public class Sender implements Runnable {
             recordsByPartition.put(tp, batch);
         }
         ProduceRequest request = new ProduceRequest(acks, timeout, 
produceRecordsByPartition);
-        RequestSend send = new RequestSend(destination, 
this.client.nextRequestHeader(ApiKeys.PRODUCE), request.toStruct());
-        return new ClientRequest(now, acks != 0, send, recordsByPartition);
+        RequestSend send = new RequestSend(destination,
+                                           
this.client.nextRequestHeader(ApiKeys.PRODUCE),
+                                           request.toStruct());
+        RequestCompletionHandler callback = new RequestCompletionHandler() {
+            public void onComplete(ClientResponse response) {
+                handleProduceResponse(response, recordsByPartition, 
time.milliseconds());
+            }
+        };
+        return new ClientRequest(now, acks != 0, send, callback);
     }
 
     /**
@@ -428,44 +435,38 @@ public class Sender implements Runnable {
             }
         }
 
-        public void updateProduceRequestMetrics(List<ClientRequest> requests) {
+        public void updateProduceRequestMetrics(Map<Integer, 
List<RecordBatch>> batches) {
             long now = time.milliseconds();
-            for (int i = 0; i < requests.size(); i++) {
-                ClientRequest request = requests.get(i);
+            for (List<RecordBatch> nodeBatch : batches.values()) {
                 int records = 0;
-
-                if (request.attachment() != null) {
-                    Map<TopicPartition, RecordBatch> responseBatches = 
(Map<TopicPartition, RecordBatch>) request.attachment();
-                    for (RecordBatch batch : responseBatches.values()) {
-
-                        // register all per-topic metrics at once
-                        String topic = batch.topicPartition.topic();
-                        maybeRegisterTopicMetrics(topic);
-
-                        // per-topic record send rate
-                        String topicRecordsCountName = "topic." + topic + 
".records-per-batch";
-                        Sensor topicRecordCount = 
Utils.notNull(this.metrics.getSensor(topicRecordsCountName));
-                        topicRecordCount.record(batch.recordCount);
-
-                        // per-topic bytes send rate
-                        String topicByteRateName = "topic." + topic + ".bytes";
-                        Sensor topicByteRate = 
Utils.notNull(this.metrics.getSensor(topicByteRateName));
-                        topicByteRate.record(batch.records.sizeInBytes());
-
-                        // per-topic compression rate
-                        String topicCompressionRateName = "topic." + topic + 
".compression-rate";
-                        Sensor topicCompressionRate = 
Utils.notNull(this.metrics.getSensor(topicCompressionRateName));
-                        
topicCompressionRate.record(batch.records.compressionRate());
-
-                        // global metrics
-                        
this.batchSizeSensor.record(batch.records.sizeInBytes(), now);
-                        this.queueTimeSensor.record(batch.drainedMs - 
batch.createdMs, now);
-                        
this.compressionRateSensor.record(batch.records.compressionRate());
-                        this.maxRecordSizeSensor.record(batch.maxRecordSize, 
now);
-                        records += batch.recordCount;
-                    }
-                    this.recordsPerRequestSensor.record(records, now);
+                for (RecordBatch batch : nodeBatch) {
+                    // register all per-topic metrics at once
+                    String topic = batch.topicPartition.topic();
+                    maybeRegisterTopicMetrics(topic);
+
+                    // per-topic record send rate
+                    String topicRecordsCountName = "topic." + topic + 
".records-per-batch";
+                    Sensor topicRecordCount = 
Utils.notNull(this.metrics.getSensor(topicRecordsCountName));
+                    topicRecordCount.record(batch.recordCount);
+
+                    // per-topic bytes send rate
+                    String topicByteRateName = "topic." + topic + ".bytes";
+                    Sensor topicByteRate = 
Utils.notNull(this.metrics.getSensor(topicByteRateName));
+                    topicByteRate.record(batch.records.sizeInBytes());
+
+                    // per-topic compression rate
+                    String topicCompressionRateName = "topic." + topic + 
".compression-rate";
+                    Sensor topicCompressionRate = 
Utils.notNull(this.metrics.getSensor(topicCompressionRateName));
+                    
topicCompressionRate.record(batch.records.compressionRate());
+
+                    // global metrics
+                    this.batchSizeSensor.record(batch.records.sizeInBytes(), 
now);
+                    this.queueTimeSensor.record(batch.drainedMs - 
batch.createdMs, now);
+                    
this.compressionRateSensor.record(batch.records.compressionRate());
+                    this.maxRecordSizeSensor.record(batch.maxRecordSize, now);
+                    records += batch.recordCount;
                 }
+                this.recordsPerRequestSensor.record(records, now);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java 
b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index d3299b9..d7ccbcd 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -26,6 +26,7 @@ public final class Cluster {
     private final Map<TopicPartition, PartitionInfo> 
partitionsByTopicPartition;
     private final Map<String, List<PartitionInfo>> partitionsByTopic;
     private final Map<Integer, List<PartitionInfo>> partitionsByNode;
+    private final Map<Integer, Node> nodesById;
 
     /**
      * Create a new cluster with the given nodes and partitions
@@ -37,6 +38,10 @@ public final class Cluster {
         List<Node> copy = new ArrayList<Node>(nodes);
         Collections.shuffle(copy);
         this.nodes = Collections.unmodifiableList(copy);
+        
+        this.nodesById = new HashMap<Integer, Node>();
+        for(Node node: nodes)
+          this.nodesById.put(node.id(), node);
 
         // index the partitions by topic/partition for quick lookup
         this.partitionsByTopicPartition = new HashMap<TopicPartition, 
PartitionInfo>(partitions.size());
@@ -97,6 +102,15 @@ public final class Cluster {
     public List<Node> nodes() {
         return this.nodes;
     }
+    
+    /**
+     * Get the node by the node id (or null if no such node exists)
+     * @param id The id of the node
+     * @return The node, or null if no such node exists
+     */
+    public Node nodeById(int id) {
+        return this.nodesById.get(id);
+    }
 
     /**
      * Get the current leader for the given topic-partition

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
index b15aa2c..28562f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
+++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
@@ -69,10 +69,10 @@ public class PartitionInfo {
 
     @Override
     public String toString() {
-        return String.format("Partition(topic = %s, partition = %d, leader = 
%d, replicas = %s, isr = %s",
+        return String.format("Partition(topic = %s, partition = %d, leader = 
%s, replicas = %s, isr = %s",
                              topic,
                              partition,
-                             leader.id(),
+                             leader == null? "none" : leader.id(),
                              fmtNodeIds(replicas),
                              fmtNodeIds(inSyncReplicas));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 98cb79b..38ce10b 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -21,6 +21,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.kafka.common.utils.Utils;
+
 /**
  * This class is used for specifying the set of expected configurations, their 
type, their defaults, their
  * documentation, and any special validation logic used for checking the 
correctness of the values the user provides.
@@ -292,39 +294,23 @@ public class ConfigDef {
       this.validStrings = validStrings;
     }
 
-    public static ValidString in(List<String> validStrings) {
-      return new ValidString(validStrings);
+    public static ValidString in(String... validStrings) {
+      return new ValidString(Arrays.asList(validStrings));
     }
 
     @Override
     public void ensureValid(String name, Object o) {
-
       String s = (String) o;
-
       if (!validStrings.contains(s)) {
-        throw new ConfigException(name,o,"String must be one of:" 
+join(validStrings));
+        throw new ConfigException(name,o,"String must be one of: " + 
Utils.join(validStrings, ", "));
       }
 
     }
 
     public String toString() {
-      return "[" + join(validStrings) + "]";
+      return "[" + Utils.join(validStrings, ", ") + "]";
     }
 
-    private String join(List<String> list)
-    {
-      StringBuilder sb = new StringBuilder();
-      boolean first = true;
-      for (String item : list)
-      {
-        if (first)
-          first = false;
-        else
-          sb.append(",");
-        sb.append(item);
-      }
-      return sb.toString();
-    }
   }
 
     private static class ConfigKey {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java 
b/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java
index 7c948b1..a566b90 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ApiException.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.KafkaException;
  * Any API exception that is part of the public protocol and should be a 
subclass of this class and be part of this
  * package.
  */
-public abstract class ApiException extends KafkaException {
+public class ApiException extends KafkaException {
 
     private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index b68bbf0..b5f8d83 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.network;
 
@@ -51,13 +47,17 @@ public interface Selectable {
     public void close();
 
     /**
-     * Initiate any sends provided, and make progress on any other I/O 
operations in-flight (connections,
-     * disconnections, existing sends, and receives)
+     * Queue the given request for sending in the subsequent {@poll(long)} 
calls
+     * @param send The request to send
+     */
+    public void send(NetworkSend send);
+
+    /**
+     * Do I/O. Reads, writes, connection establishment, etc.
      * @param timeout The amount of time to block if there is nothing to do
-     * @param sends The new sends to initiate
      * @throws IOException
      */
-    public void poll(long timeout, List<NetworkSend> sends) throws IOException;
+    public void poll(long timeout) throws IOException;
 
     /**
      * The list of sends that completed on the last {@link #poll(long, List) 
poll()} call.
@@ -81,4 +81,26 @@ public interface Selectable {
      */
     public List<Integer> connected();
 
+    /**
+     * Disable reads from the given connection
+     * @param id The id for the connection
+     */
+    public void mute(int id);
+
+    /**
+     * Re-enable reads from the given connection
+     * @param id The id for the connection
+     */
+    public void unmute(int id);
+
+    /**
+     * Disable reads from all connections
+     */
+    public void muteAll();
+
+    /**
+     * Re-enable reads from all connections
+     */
+    public void unmuteAll();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 74d695b..e18a769 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -12,6 +12,7 @@
  */
 package org.apache.kafka.common.network;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -81,6 +82,7 @@ public class Selector implements Selectable {
     private final List<NetworkReceive> completedReceives;
     private final List<Integer> disconnected;
     private final List<Integer> connected;
+    private final List<Integer> failedSends;
     private final Time time;
     private final SelectorMetrics sensors;
     private final String metricGrpPrefix;
@@ -103,6 +105,7 @@ public class Selector implements Selectable {
         this.completedReceives = new ArrayList<NetworkReceive>();
         this.connected = new ArrayList<Integer>();
         this.disconnected = new ArrayList<Integer>();
+        this.failedSends = new ArrayList<Integer>();
         this.sensors = new SelectorMetrics(metrics);
     }
 
@@ -179,10 +182,26 @@ public class Selector implements Selectable {
     }
 
     /**
+     * Queue the given request for sending in the subsequent {@poll(long)} 
calls
+     * @param send The request to send
+     */
+    public void send(NetworkSend send) {
+        SelectionKey key = keyForId(send.destination());
+        Transmissions transmissions = transmissions(key);
+        if (transmissions.hasSend())
+            throw new IllegalStateException("Attempt to begin a send operation 
with prior send operation still in progress.");
+        transmissions.send = send;
+        try {
+            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+        } catch (CancelledKeyException e) {
+            close(key);
+            this.failedSends.add(send.destination());
+        }
+    }
+
+    /**
      * Do whatever I/O can be done on each connection without blocking. This 
includes completing connections, completing
      * disconnections, initiating new sends, or making progress on in-progress 
sends or receives.
-     * <p>
-     * The provided network sends will be started.
      * 
      * When this call is completed the user can check for completed sends, 
receives, connections or disconnects using
      * {@link #completedSends()}, {@link #completedReceives()}, {@link 
#connected()}, {@link #disconnected()}. These
@@ -190,29 +209,13 @@ public class Selector implements Selectable {
      * completed I/O.
      * 
      * @param timeout The amount of time to wait, in milliseconds. If 
negative, wait indefinitely.
-     * @param sends The list of new sends to begin
-     * 
      * @throws IllegalStateException If a send is given for which we have no 
existing connection or for which there is
      *         already an in-progress send
      */
     @Override
-    public void poll(long timeout, List<NetworkSend> sends) throws IOException 
{
+    public void poll(long timeout) throws IOException {
         clear();
 
-        /* register for write interest on any new sends */
-        for (NetworkSend send : sends) {
-            SelectionKey key = keyForId(send.destination());
-            Transmissions transmissions = transmissions(key);
-            if (transmissions.hasSend())
-                throw new IllegalStateException("Attempt to begin a send 
operation with prior send operation still in progress.");
-            transmissions.send = send;
-            try {
-                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-            } catch (CancelledKeyException e) {
-                close(key);
-            }
-        }
-
         /* check ready keys */
         long startSelect = time.nanoseconds();
         int readyKeys = select(timeout);
@@ -266,21 +269,34 @@ public class Selector implements Selectable {
                     }
 
                     /* cancel any defunct sockets */
-                    if (!key.isValid())
+                    if (!key.isValid()) {
                         close(key);
+                        this.disconnected.add(transmissions.id);
+                    }
                 } catch (IOException e) {
-                    InetAddress remoteAddress = null;
-                    Socket socket = channel.socket();
-                    if (socket != null)
-                        remoteAddress = socket.getInetAddress();
-                    log.warn("Error in I/O with {}", remoteAddress , e);
+                    String desc = socketDescription(channel);
+                    if(e instanceof EOFException)
+                        log.info("Connection {} disconnected", desc);
+                    else
+                        log.warn("Error in I/O with connection to {}", desc, 
e);
                     close(key);
+                    this.disconnected.add(transmissions.id);
                 }
             }
         }
         long endIo = time.nanoseconds();
         this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
     }
+    
+    private String socketDescription(SocketChannel channel) {
+        Socket socket = channel.socket();
+        if(socket == null)
+            return "[unconnected socket]";
+        else if(socket.getInetAddress() != null)
+            return socket.getInetAddress().toString();
+        else
+            return socket.getLocalAddress().toString();
+    }
 
     @Override
     public List<NetworkSend> completedSends() {
@@ -302,6 +318,36 @@ public class Selector implements Selectable {
         return this.connected;
     }
 
+    @Override
+    public void mute(int id) {
+        mute(this.keyForId(id));
+    }
+
+    private void mute(SelectionKey key) {
+        key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
+    }
+
+    @Override
+    public void unmute(int id) {
+        unmute(this.keyForId(id));
+    }
+
+    private void unmute(SelectionKey key) {
+        key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+    }
+
+    @Override
+    public void muteAll() {
+        for (SelectionKey key : this.keys.values())
+            mute(key);
+    }
+
+    @Override
+    public void unmuteAll() {
+        for (SelectionKey key : this.keys.values())
+            unmute(key);
+    }
+
     /**
      * Clear the results from the prior poll
      */
@@ -310,6 +356,8 @@ public class Selector implements Selectable {
         this.completedReceives.clear();
         this.connected.clear();
         this.disconnected.clear();
+        this.disconnected.addAll(this.failedSends);
+        this.failedSends.clear();
     }
 
     /**
@@ -335,7 +383,6 @@ public class Selector implements Selectable {
         SocketChannel channel = channel(key);
         Transmissions trans = transmissions(key);
         if (trans != null) {
-            this.disconnected.add(trans.id);
             this.keys.remove(trans.id);
             trans.clearReceive();
             trans.clearSend();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 3316b6a..a8deac4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -19,36 +19,62 @@ package org.apache.kafka.common.protocol;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.kafka.common.errors.*;
-
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.NetworkException;
+import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.NotLeaderForPartitionException;
+import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.RecordBatchTooLargeException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 
 /**
  * This class contains all the client-server errors--those errors that must be 
sent from the server to the client. These
  * are thus part of the protocol. The names can be changed but the error code 
cannot.
- *
+ * 
  * Do not add exceptions that occur only on the client or only on the server 
here.
  */
 public enum Errors {
     UNKNOWN(-1, new UnknownServerException("The server experienced an 
unexpected error when processing the request")),
     NONE(0, null),
-    OFFSET_OUT_OF_RANGE(1, new OffsetOutOfRangeException("The requested offset 
is not within the range of offsets maintained by the server.")),
-    CORRUPT_MESSAGE(2, new CorruptRecordException("The message contents does 
not match the message CRC or the message is otherwise corrupt.")),
-    UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This 
server does not host this topic-partition.")),
+    OFFSET_OUT_OF_RANGE(1,
+            new OffsetOutOfRangeException("The requested offset is not within 
the range of offsets maintained by the server.")),
+    CORRUPT_MESSAGE(2,
+            new CorruptRecordException("The message contents does not match 
the message CRC or the message is otherwise corrupt.")),
+    UNKNOWN_TOPIC_OR_PARTITION(3,
+            new UnknownTopicOrPartitionException("This server does not host 
this topic-partition.")),
     // TODO: errorCode 4 for InvalidFetchSize
-    LEADER_NOT_AVAILABLE(5, new LeaderNotAvailableException("There is no 
leader for this topic-partition as we are in the middle of a leadership 
election.")),
-    NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This 
server is not the leader for that topic-partition.")),
+    LEADER_NOT_AVAILABLE(5,
+            new LeaderNotAvailableException("There is no leader for this 
topic-partition as we are in the middle of a leadership election.")),
+    NOT_LEADER_FOR_PARTITION(6,
+            new NotLeaderForPartitionException("This server is not the leader 
for that topic-partition.")),
     REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")),
-    // TODO: errorCode 8, 9, 11
-    MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a 
message larger than the max message size the server will accept.")),
+    MESSAGE_TOO_LARGE(10,
+            new RecordTooLargeException("The request included a message larger 
than the max message size the server will accept.")),
     OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata 
field of the offset request was too large.")),
     NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before 
a response was received.")),
-    // TODO: errorCode 14, 15, 16
-    INVALID_TOPIC_EXCEPTION(17, new InvalidTopicException("The request 
attempted to perform an operation on an invalid topic.")),
-    RECORD_LIST_TOO_LARGE(18, new RecordBatchTooLargeException("The request 
included message batch larger than the configured segment size on the 
server.")),
-    NOT_ENOUGH_REPLICAS(19, new NotEnoughReplicasException("Messages are 
rejected since there are fewer in-sync replicas than required.")),
-    NOT_ENOUGH_REPLICAS_AFTER_APPEND(20, new 
NotEnoughReplicasAfterAppendException("Messages are written to the log, but to 
fewer in-sync replicas than required."));
+    OFFSET_LOAD_IN_PROGRESS(14, new ApiException("The coordinator is loading 
offsets and can't process requests.")),
+    CONSUMER_COORDINATOR_NOT_AVAILABLE(15, new ApiException("The coordinator 
is not available.")),
+    NOT_COORDINATOR_FOR_CONSUMER(16, new ApiException("This is not the correct 
co-ordinator for this consumer.")),
+    INVALID_TOPIC_EXCEPTION(17,
+            new InvalidTopicException("The request attempted to perform an 
operation on an invalid topic.")),
+    RECORD_LIST_TOO_LARGE(18,
+            new RecordBatchTooLargeException("The request included message 
batch larger than the configured segment size on the server.")),
+    NOT_ENOUGH_REPLICAS(19,
+            new NotEnoughReplicasException("Messages are rejected since there 
are fewer in-sync replicas than required.")),
+    NOT_ENOUGH_REPLICAS_AFTER_APPEND(20,
+            new NotEnoughReplicasAfterAppendException("Messages are written to 
the log, but to fewer in-sync replicas than required."));
+
     private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, 
Errors>();
     private static Map<Short, Errors> codeToError = new HashMap<Short, 
Errors>();
+
     static {
         for (Errors error : Errors.values()) {
             codeToError.put(error.code(), error);
@@ -84,8 +110,9 @@ public enum Errors {
      * Throw the exception corresponding to this error if there is one
      */
     public void maybeThrow() {
-        if (exception != null)
+        if (exception != null) {
             throw this.exception;
+        }
     }
 
     /**

Reply via email to