http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 176571c..8ec8b75 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -26,13 +26,13 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ApiException;
-import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
@@ -79,11 +79,12 @@ public class ConsumerCoordinatorTest {
     private String topicName = "test";
     private String groupId = "test-group";
     private TopicPartition tp = new TopicPartition(topicName, 0);
+    private int rebalanceTimeoutMs = 60000;
     private int sessionTimeoutMs = 10000;
     private int heartbeatIntervalMs = 5000;
     private long retryBackoffMs = 100;
     private boolean autoCommitEnabled = false;
-    private long autoCommitIntervalMs = 2000;
+    private int autoCommitIntervalMs = 2000;
     private MockPartitionAssignor partitionAssignor = new 
MockPartitionAssignor();
     private List<PartitionAssignor> assignors = 
Collections.<PartitionAssignor>singletonList(partitionAssignor);
     private MockTime time;
@@ -123,7 +124,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testNormalHeartbeat() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // normal heartbeat
@@ -141,7 +142,7 @@ public class ConsumerCoordinatorTest {
 
     @Test(expected = GroupAuthorizationException.class)
     public void testGroupDescribeUnauthorized() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.GROUP_AUTHORIZATION_FAILED.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.GROUP_AUTHORIZATION_FAILED.code()));
         coordinator.ensureCoordinatorReady();
     }
 
@@ -149,17 +150,17 @@ public class ConsumerCoordinatorTest {
     public void testGroupReadUnauthorized() {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupLeaderResponse(0, "memberId", 
Collections.<String, List<String>>emptyMap(),
                 Errors.GROUP_AUTHORIZATION_FAILED.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
     }
 
     @Test
     public void testCoordinatorNotAvailable() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // GROUP_COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown
@@ -180,7 +181,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testNotCoordinator() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // not_coordinator will mark coordinator as unknown
@@ -201,7 +202,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testIllegalGeneration() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // illegal_generation will cause re-partition
@@ -225,7 +226,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testUnknownConsumerId() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // illegal_generation will cause re-partition
@@ -249,7 +250,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCoordinatorDisconnect() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // coordinator disconnect will mark coordinator as unknown
@@ -279,12 +280,12 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(Arrays.asList(topicName));
         metadata.update(cluster, time.milliseconds());
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupLeaderResponse(0, consumerId, 
Collections.<String, List<String>>emptyMap(),
                 Errors.INVALID_GROUP_ID.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
     }
 
     @Test
@@ -298,7 +299,7 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(Arrays.asList(topicName));
         metadata.update(cluster, time.milliseconds());
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // normal join group
@@ -315,7 +316,7 @@ public class ConsumerCoordinatorTest {
                         sync.groupAssignment().containsKey(consumerId);
             }
         }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), 
subscriptions.assignedPartitions());
@@ -336,7 +337,7 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(Arrays.asList(topicName));
         metadata.update(cluster, time.milliseconds());
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         Map<String, List<String>> memberSubscriptions = 
Collections.singletonMap(consumerId, Arrays.asList(topicName));
@@ -347,14 +348,14 @@ public class ConsumerCoordinatorTest {
         consumerClient.wakeup();
 
         try {
-            coordinator.ensurePartitionAssignment();
+            coordinator.poll(time.milliseconds());
         } catch (WakeupException e) {
             // ignore
         }
 
         // now complete the second half
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), 
Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), 
subscriptions.assignedPartitions());
@@ -371,7 +372,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // normal join group
@@ -386,7 +387,7 @@ public class ConsumerCoordinatorTest {
             }
         }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
 
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), 
subscriptions.assignedPartitions());
@@ -402,12 +403,12 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), 
Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         final AtomicBoolean received = new AtomicBoolean(false);
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -430,12 +431,12 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), 
Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         final AtomicBoolean received = new AtomicBoolean(false);
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -449,8 +450,9 @@ public class ConsumerCoordinatorTest {
         }, new LeaveGroupResponse(Errors.NONE.code()).toStruct());
         coordinator.maybeLeaveGroup();
         assertTrue(received.get());
-        assertEquals(JoinGroupRequest.UNKNOWN_MEMBER_ID, coordinator.memberId);
-        assertEquals(OffsetCommitRequest.DEFAULT_GENERATION_ID, 
coordinator.generation);
+
+        AbstractCoordinator.Generation generation = coordinator.generation();
+        assertNull(generation);
     }
 
     @Test(expected = KafkaException.class)
@@ -460,13 +462,13 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE.code()));
         
client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(),
 Errors.UNKNOWN.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
     }
 
     @Test
@@ -476,7 +478,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator returns unknown member id
@@ -493,7 +495,7 @@ public class ConsumerCoordinatorTest {
         }, joinGroupFollowerResponse(2, consumerId, "leader", 
Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), 
Errors.NONE.code()));
 
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), 
subscriptions.assignedPartitions());
@@ -506,7 +508,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator rebalance on sync
@@ -517,7 +519,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupFollowerResponse(2, consumerId, 
"leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), 
Errors.NONE.code()));
 
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), 
subscriptions.assignedPartitions());
@@ -530,7 +532,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator rebalance on sync
@@ -547,7 +549,7 @@ public class ConsumerCoordinatorTest {
         }, joinGroupFollowerResponse(2, consumerId, "leader", 
Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), 
Errors.NONE.code()));
 
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), 
subscriptions.assignedPartitions());
@@ -560,13 +562,13 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), 
Errors.NONE.code()));
 
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
 
@@ -595,7 +597,7 @@ public class ConsumerCoordinatorTest {
         // we only have metadata for one topic initially
         metadata.update(TestUtils.singletonCluster(topic1, 1), 
time.milliseconds());
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // prepare initial rebalance
@@ -625,7 +627,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupLeaderResponse(2, consumerId, 
memberSubscriptions, Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), 
Errors.NONE.code()));
 
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), 
subscriptions.assignedPartitions());
@@ -656,13 +658,13 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // join the group once
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", 
"leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), 
Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertEquals(1, rebalanceListener.revokedCount);
         assertEquals(1, rebalanceListener.assignedCount);
@@ -671,7 +673,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.needReassignment();
         client.prepareResponse(joinGroupFollowerResponse(2, "consumer", 
"leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), 
Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         assertEquals(2, rebalanceListener.revokedCount);
         assertEquals(Collections.singleton(tp), rebalanceListener.revoked);
@@ -684,15 +686,15 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // disconnected from original coordinator will cause re-discover and 
join again
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", 
"leader", Errors.NONE.code()), true);
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", 
"leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), 
Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
         assertFalse(subscriptions.partitionAssignmentNeeded());
         assertEquals(Collections.singleton(tp), 
subscriptions.assignedPartitions());
         assertEquals(1, rebalanceListener.revokedCount);
@@ -705,25 +707,26 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // coordinator doesn't like the session timeout
         client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", 
Errors.INVALID_SESSION_TIMEOUT.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
     }
 
     @Test
     public void testCommitOffsetOnly() {
         subscriptions.assignFromUser(Arrays.asList(tp));
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
 
         AtomicBoolean success = new AtomicBoolean(false);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)), callback(success));
+        coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
@@ -739,18 +742,18 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), 
Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         subscriptions.seek(tp, 100);
 
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
         time.sleep(autoCommitIntervalMs);
-        consumerClient.poll(0);
+        coordinator.poll(time.milliseconds());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
     }
@@ -765,7 +768,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // haven't joined, so should not cause a commit
@@ -774,13 +777,13 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), 
Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         subscriptions.seek(tp, 100);
 
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
         time.sleep(autoCommitIntervalMs);
-        consumerClient.poll(0);
+        coordinator.poll(time.milliseconds());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
     }
@@ -793,12 +796,12 @@ public class ConsumerCoordinatorTest {
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 100);
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
         time.sleep(autoCommitIntervalMs);
-        consumerClient.poll(0);
+        coordinator.poll(time.milliseconds());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
     }
@@ -819,13 +822,13 @@ public class ConsumerCoordinatorTest {
         assertNull(subscriptions.committed(tp));
 
         // now find the coordinator
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // sleep only for the retry backoff
         time.sleep(retryBackoffMs);
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
-        consumerClient.poll(0);
+        coordinator.poll(time.milliseconds());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
     }
@@ -834,13 +837,14 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetMetadata() {
         subscriptions.assignFromUser(Arrays.asList(tp));
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
 
         AtomicBoolean success = new AtomicBoolean(false);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L, "hello")), callback(success));
+        coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
 
         assertEquals(100L, subscriptions.committed(tp).offset());
@@ -850,10 +854,11 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetAsyncWithDefaultCallback() {
         int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)), null);
+        coordinator.invokeCompletedOffsetCommitCallbacks();
         assertEquals(invokedBeforeTest + 1, 
defaultOffsetCommitCallback.invoked);
         assertNull(defaultOffsetCommitCallback.exception);
     }
@@ -863,12 +868,12 @@ public class ConsumerCoordinatorTest {
         // enable auto-assignment
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", 
"leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), 
Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         // now switch to manual assignment
         client.prepareResponse(new 
LeaveGroupResponse(Errors.NONE.code()).toStruct());
@@ -888,29 +893,32 @@ public class ConsumerCoordinatorTest {
 
         AtomicBoolean success = new AtomicBoolean(false);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)), callback(success));
+        coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
     }
 
     @Test
     public void testCommitOffsetAsyncFailedWithDefaultCallback() {
         int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)), null);
+        coordinator.invokeCompletedOffsetCommitCallbacks();
         assertEquals(invokedBeforeTest + 1, 
defaultOffsetCommitCallback.invoked);
         assertTrue(defaultOffsetCommitCallback.exception instanceof 
RetriableCommitFailedException);
     }
 
     @Test
     public void testCommitOffsetAsyncCoordinatorNotAvailable() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // async commit with coordinator not available
         MockCommitCallback cb = new MockCommitCallback();
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)), cb);
+        coordinator.invokeCompletedOffsetCommitCallbacks();
 
         assertTrue(coordinator.coordinatorUnknown());
         assertEquals(1, cb.invoked);
@@ -919,13 +927,14 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetAsyncNotCoordinator() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // async commit with not coordinator
         MockCommitCallback cb = new MockCommitCallback();
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NOT_COORDINATOR_FOR_GROUP.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)), cb);
+        coordinator.invokeCompletedOffsetCommitCallbacks();
 
         assertTrue(coordinator.coordinatorUnknown());
         assertEquals(1, cb.invoked);
@@ -934,13 +943,14 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetAsyncDisconnected() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // async commit with coordinator disconnected
         MockCommitCallback cb = new MockCommitCallback();
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())), true);
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)), cb);
+        coordinator.invokeCompletedOffsetCommitCallbacks();
 
         assertTrue(coordinator.coordinatorUnknown());
         assertEquals(1, cb.invoked);
@@ -949,36 +959,36 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetSyncNotCoordinator() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get 
metadata, and then submit the commit request)
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NOT_COORDINATOR_FOR_GROUP.code())));
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
         coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)));
     }
 
     @Test
     public void testCommitOffsetSyncCoordinatorNotAvailable() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get 
metadata, and then submit the commit request)
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
         coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)));
     }
 
     @Test
     public void testCommitOffsetSyncCoordinatorDisconnected() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get 
metadata, and then submit the commit request)
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())), true);
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.NONE.code())));
         coordinator.commitOffsetsSync(Collections.singletonMap(tp, new 
OffsetAndMetadata(100L)));
     }
@@ -986,7 +996,7 @@ public class ConsumerCoordinatorTest {
     @Test(expected = OffsetMetadataTooLarge.class)
     public void testCommitOffsetMetadataTooLarge() {
         // since offset metadata is provided by the user, we have to propagate 
the exception so they can handle it
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.OFFSET_METADATA_TOO_LARGE.code())));
@@ -996,7 +1006,7 @@ public class ConsumerCoordinatorTest {
     @Test(expected = CommitFailedException.class)
     public void testCommitOffsetIllegalGeneration() {
         // we cannot retry if a rebalance occurs before the commit completed
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.ILLEGAL_GENERATION.code())));
@@ -1006,7 +1016,7 @@ public class ConsumerCoordinatorTest {
     @Test(expected = CommitFailedException.class)
     public void testCommitOffsetUnknownMemberId() {
         // we cannot retry if a rebalance occurs before the commit completed
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.UNKNOWN_MEMBER_ID.code())));
@@ -1016,7 +1026,7 @@ public class ConsumerCoordinatorTest {
     @Test(expected = CommitFailedException.class)
     public void testCommitOffsetRebalanceInProgress() {
         // we cannot retry if a rebalance occurs before the commit completed
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, 
Errors.REBALANCE_IN_PROGRESS.code())));
@@ -1025,7 +1035,7 @@ public class ConsumerCoordinatorTest {
 
     @Test(expected = KafkaException.class)
     public void testCommitOffsetSyncCallbackWithNonRetriableException() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         // sync commit with invalid partitions should throw if we have no 
callback
@@ -1035,7 +1045,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testRefreshOffset() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(Arrays.asList(tp));
@@ -1048,7 +1058,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testRefreshOffsetLoadInProgress() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(Arrays.asList(tp));
@@ -1062,13 +1072,13 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testRefreshOffsetNotCoordinatorForConsumer() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(tp, 
Errors.NOT_COORDINATOR_FOR_GROUP.code(), "", 100L));
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 
100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
         assertFalse(subscriptions.refreshCommitsNeeded());
@@ -1077,7 +1087,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testRefreshOffsetWithNoFetchableOffsets() {
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(Arrays.asList(tp));
@@ -1122,12 +1132,12 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(topics);
         subscriptions.needReassignment();
 
-        client.prepareResponse(consumerMetadataResponse(node, 
Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), 
Errors.NONE.code()));
-        coordinator.ensurePartitionAssignment();
+        coordinator.poll(time.milliseconds());
 
         metadata.update(TestUtils.singletonCluster(topicName, 2), 
time.milliseconds());
         assertTrue("Topic not found in metadata", 
metadata.containsTopic(topicName));
@@ -1150,6 +1160,7 @@ public class ConsumerCoordinatorTest {
         return new ConsumerCoordinator(
                 consumerClient,
                 groupId,
+                rebalanceTimeoutMs,
                 sessionTimeoutMs,
                 heartbeatIntervalMs,
                 assignors,
@@ -1166,7 +1177,7 @@ public class ConsumerCoordinatorTest {
                 excludeInternalTopics);
     }
 
-    private Struct consumerMetadataResponse(Node node, short error) {
+    private Struct groupCoordinatorResponse(Node node, short error) {
         GroupCoordinatorResponse response = new 
GroupCoordinatorResponse(error, node);
         return response.toStruct();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index f0f2a97..8dcbde2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -15,9 +15,9 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -76,22 +76,6 @@ public class ConsumerNetworkClientTest {
     }
 
     @Test
-    public void schedule() {
-        TestDelayedTask task = new TestDelayedTask();
-        consumerClient.schedule(task, time.milliseconds());
-        consumerClient.poll(0);
-        assertEquals(1, task.executions);
-
-        consumerClient.schedule(task, time.milliseconds() + 100);
-        consumerClient.poll(0);
-        assertEquals(1, task.executions);
-
-        time.sleep(100);
-        consumerClient.poll(0);
-        assertEquals(2, task.executions);
-    }
-
-    @Test
     public void wakeup() {
         RequestFuture<ClientResponse> future = consumerClient.send(node, 
ApiKeys.METADATA, heartbeatRequest());
         consumerClient.wakeup();
@@ -175,12 +159,4 @@ public class ConsumerNetworkClientTest {
         return response.toStruct();
     }
 
-    private static class TestDelayedTask implements DelayedTask {
-        int executions = 0;
-        @Override
-        public void run(long now) {
-            executions++;
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
deleted file mode 100644
index db87b66..0000000
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-
-public class DelayedTaskQueueTest {
-    private DelayedTaskQueue scheduler = new DelayedTaskQueue();
-    private ArrayList<DelayedTask> executed = new ArrayList<DelayedTask>();
-
-    @Test
-    public void testScheduling() {
-        // Empty scheduler
-        assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(0));
-        scheduler.poll(0);
-        assertEquals(Collections.emptyList(), executed);
-
-        TestTask task1 = new TestTask();
-        TestTask task2 = new TestTask();
-        TestTask task3 = new TestTask();
-        scheduler.add(task1, 20);
-        assertEquals(20, scheduler.nextTimeout(0));
-        scheduler.add(task2, 10);
-        assertEquals(10, scheduler.nextTimeout(0));
-        scheduler.add(task3, 30);
-        assertEquals(10, scheduler.nextTimeout(0));
-
-        scheduler.poll(5);
-        assertEquals(Collections.emptyList(), executed);
-        assertEquals(5, scheduler.nextTimeout(5));
-
-        scheduler.poll(10);
-        assertEquals(Arrays.asList(task2), executed);
-        assertEquals(10, scheduler.nextTimeout(10));
-
-        scheduler.poll(20);
-        assertEquals(Arrays.asList(task2, task1), executed);
-        assertEquals(20, scheduler.nextTimeout(10));
-
-        scheduler.poll(30);
-        assertEquals(Arrays.asList(task2, task1, task3), executed);
-        assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(30));
-    }
-
-    @Test
-    public void testRemove() {
-        TestTask task1 = new TestTask();
-        TestTask task2 = new TestTask();
-        TestTask task3 = new TestTask();
-        scheduler.add(task1, 20);
-        scheduler.add(task2, 10);
-        scheduler.add(task3, 30);
-        scheduler.add(task1, 40);
-        assertEquals(10, scheduler.nextTimeout(0));
-
-        scheduler.remove(task2);
-        assertEquals(20, scheduler.nextTimeout(0));
-
-        scheduler.remove(task1);
-        assertEquals(30, scheduler.nextTimeout(0));
-
-        scheduler.remove(task3);
-        assertEquals(Long.MAX_VALUE, scheduler.nextTimeout(0));
-    }
-
-    private class TestTask implements DelayedTask {
-        @Override
-        public void run(long now) {
-            executed.add(this);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index ba04cb5..5186618 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -438,6 +438,7 @@ public class FetcherTest {
         fetcherNoAutoReset.sendFetches();
         client.prepareResponse(fetchResponse(this.records.buffer(), 
Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         consumerClient.poll(0);
+
         assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
         try {
             fetcherNoAutoReset.fetchedRecords();

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
index 75e68cc..0177c79 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
@@ -28,8 +28,10 @@ public class HeartbeatTest {
 
     private long timeout = 300L;
     private long interval = 100L;
+    private long maxPollInterval = 900L;
+    private long retryBackoff = 10L;
     private MockTime time = new MockTime();
-    private Heartbeat heartbeat = new Heartbeat(timeout, interval, -1L);
+    private Heartbeat heartbeat = new Heartbeat(timeout, interval, 
maxPollInterval, retryBackoff);
 
     @Test
     public void testShouldHeartbeat() {
@@ -64,7 +66,7 @@ public class HeartbeatTest {
     public void testResetSession() {
         heartbeat.sentHeartbeat(time.milliseconds());
         time.sleep(305);
-        heartbeat.resetSessionTimeout(time.milliseconds());
+        heartbeat.resetTimeouts(time.milliseconds());
         assertFalse(heartbeat.sessionTimeoutExpired(time.milliseconds()));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index be7f974..766c745 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -55,8 +55,9 @@ public class RequestResponseTest {
                 createHeartBeatRequest(),
                 createHeartBeatRequest().getErrorResponse(0, new 
UnknownServerException()),
                 createHeartBeatResponse(),
-                createJoinGroupRequest(),
-                createJoinGroupRequest().getErrorResponse(0, new 
UnknownServerException()),
+                createJoinGroupRequest(1),
+                createJoinGroupRequest(0).getErrorResponse(0, new 
UnknownServerException()),
+                createJoinGroupRequest(1).getErrorResponse(1, new 
UnknownServerException()),
                 createJoinGroupResponse(),
                 createLeaveGroupRequest(),
                 createLeaveGroupRequest().getErrorResponse(0, new 
UnknownServerException()),
@@ -118,6 +119,7 @@ public class RequestResponseTest {
         checkSerialization(createOffsetCommitRequest(0).getErrorResponse(0, 
new UnknownServerException()), 0);
         checkSerialization(createOffsetCommitRequest(1), 1);
         checkSerialization(createOffsetCommitRequest(1).getErrorResponse(1, 
new UnknownServerException()), 1);
+        checkSerialization(createJoinGroupRequest(0), 0);
         checkSerialization(createUpdateMetadataRequest(0, null), 0);
         checkSerialization(createUpdateMetadataRequest(0, 
null).getErrorResponse(0, new UnknownServerException()), 0);
         checkSerialization(createUpdateMetadataRequest(1, null), 1);
@@ -236,11 +238,15 @@ public class RequestResponseTest {
         return new HeartbeatResponse(Errors.NONE.code());
     }
 
-    private AbstractRequest createJoinGroupRequest() {
+    private AbstractRequest createJoinGroupRequest(int version) {
         ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
         List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();
         protocols.add(new JoinGroupRequest.ProtocolMetadata("consumer-range", 
metadata));
-        return new JoinGroupRequest("group1", 30000, "consumer1", "consumer", 
protocols);
+        if (version == 0) {
+            return new JoinGroupRequest("group1", 30000, "consumer1", 
"consumer", protocols);
+        } else {
+            return new JoinGroupRequest("group1", 10000, 60000, "consumer1", 
"consumer", protocols);
+        }
     }
 
     private AbstractRequestResponse createJoinGroupResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index f5aa8ae..6e9d7b4 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -43,13 +43,31 @@ public class DistributedConfig extends WorkerConfig {
      * <code>session.timeout.ms</code>
      */
     public static final String SESSION_TIMEOUT_MS_CONFIG = 
"session.timeout.ms";
-    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to 
detect failures when using Kafka's group management facilities.";
+    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to 
detect worker failures." +
+            "The worker sends periodic heartbeats to indicate its liveness to 
the broker. If no heartbeats are " +
+            "received by the broker before the expiration of this session 
timeout, then the broker will remove the " +
+            "worker from the group and initiate a rebalance. Note that the 
value must be in the allowable range as " +
+            "configured in the broker configuration by 
<code>group.min.session.timeout.ms</code> " +
+            "and <code>group.max.session.timeout.ms</code>.";
 
     /**
      * <code>heartbeat.interval.ms</code>
      */
     public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
"heartbeat.interval.ms";
-    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time 
between heartbeats to the group coordinator when using Kafka's group management 
facilities. Heartbeats are used to ensure that the worker's session stays 
active and to facilitate rebalancing when new members join or leave the group. 
The value must be set lower than <code>session.timeout.ms</code>, but typically 
should be set no higher than 1/3 of that value. It can be adjusted even lower 
to control the expected time for normal rebalances.";
+    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time 
between heartbeats to the group " +
+            "coordinator when using Kafka's group management facilities. 
Heartbeats are used to ensure that the " +
+            "worker's session stays active and to facilitate rebalancing when 
new members join or leave the group. " +
+            "The value must be set lower than <code>session.timeout.ms</code>, 
but typically should be set no higher " +
+            "than 1/3 of that value. It can be adjusted even lower to control 
the expected time for normal rebalances.";
+
+    /**
+     * <code>rebalance.timeout.ms</code>
+     */
+    public static final String REBALANCE_TIMEOUT_MS_CONFIG = 
"rebalance.timeout.ms";
+    private static final String REBALANCE_TIMEOUT_MS_DOC = "The maximum 
allowed time for each worker to join the group " +
+            "once a rebalance has begun. This is basically a limit on the 
amount of time needed for all tasks to " +
+            "flush any pending data and commit offsets. If the timeout is 
exceeded, then the worker will be removed " +
+            "from the group, which will cause offset commit failures.";
 
     /**
      * <code>worker.sync.timeout.ms</code>
@@ -90,9 +108,14 @@ public class DistributedConfig extends WorkerConfig {
                 .define(GROUP_ID_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, GROUP_ID_DOC)
                 .define(SESSION_TIMEOUT_MS_CONFIG,
                         ConfigDef.Type.INT,
-                        30000,
+                        10000,
                         ConfigDef.Importance.HIGH,
                         SESSION_TIMEOUT_MS_DOC)
+                .define(REBALANCE_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        60000,
+                        ConfigDef.Importance.HIGH,
+                        REBALANCE_TIMEOUT_MS_DOC)
                 .define(HEARTBEAT_INTERVAL_MS_CONFIG,
                         ConfigDef.Type.INT,
                         3000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 9c74960..9114555 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -21,6 +21,7 @@ import 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.utils.CircularIterator;
 import org.apache.kafka.common.utils.Time;
@@ -63,6 +64,7 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
      */
     public WorkerCoordinator(ConsumerNetworkClient client,
                              String groupId,
+                             int rebalanceTimeoutMs,
                              int sessionTimeoutMs,
                              int heartbeatIntervalMs,
                              Metrics metrics,
@@ -74,6 +76,7 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
                              WorkerRebalanceListener listener) {
         super(client,
                 groupId,
+                rebalanceTimeoutMs,
                 sessionTimeoutMs,
                 heartbeatIntervalMs,
                 metrics,
@@ -97,6 +100,32 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
         return "connect";
     }
 
+    public void poll(long timeout) {
+        // poll for io until the timeout expires
+        long now = time.milliseconds();
+        long deadline = now + timeout;
+
+        while (now <= deadline) {
+            if (coordinatorUnknown()) {
+                ensureCoordinatorReady();
+                now = time.milliseconds();
+            }
+
+            if (needRejoin()) {
+                ensureActiveGroup();
+                now = time.milliseconds();
+            }
+
+            pollHeartbeat(now);
+
+            // Note that because the network client is shared with the 
background heartbeat thread,
+            // we do not want to block in poll longer than the time to the 
next heartbeat.
+            long remaining = Math.max(0, deadline - now);
+            client.poll(Math.min(remaining, timeToNextHeartbeat(now)));
+            now = time.milliseconds();
+        }
+    }
+
     @Override
     public List<ProtocolMetadata> metadata() {
         configSnapshot = configStorage.snapshot();
@@ -238,12 +267,15 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
     }
 
     @Override
-    public boolean needRejoin() {
+    protected boolean needRejoin() {
         return super.needRejoin() || (assignmentSnapshot == null || 
assignmentSnapshot.failed()) || rejoinRequested;
     }
 
     public String memberId() {
-        return this.memberId;
+        Generation generation = generation();
+        if (generation != null)
+            return generation.memberId;
+        return JoinGroupRequest.UNKNOWN_MEMBER_ID;
     }
 
     @Override
@@ -252,7 +284,7 @@ public final class WorkerCoordinator extends 
AbstractCoordinator implements Clos
     }
 
     private boolean isLeader() {
-        return assignmentSnapshot != null && 
memberId.equals(assignmentSnapshot.leader());
+        return assignmentSnapshot != null && 
memberId().equals(assignmentSnapshot.leader());
     }
 
     public String ownerUrl(String connector) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index c21b9bf..a5213db 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -104,6 +104,7 @@ public class WorkerGroupMember {
                     
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG));
             this.coordinator = new WorkerCoordinator(this.client,
                     config.getString(DistributedConfig.GROUP_ID_CONFIG),
+                    
config.getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG),
                     config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG),
                     
config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
                     metrics,
@@ -131,23 +132,13 @@ public class WorkerGroupMember {
     }
 
     public void ensureActive() {
-        coordinator.ensureCoordinatorReady();
-        coordinator.ensureActiveGroup();
+        coordinator.poll(0);
     }
 
     public void poll(long timeout) {
         if (timeout < 0)
             throw new IllegalArgumentException("Timeout must not be negative");
-
-        // poll for new data until the timeout expires
-        long remaining = timeout;
-        while (remaining >= 0) {
-            long start = time.milliseconds();
-            coordinator.ensureCoordinatorReady();
-            coordinator.ensureActiveGroup();
-            client.poll(remaining);
-            remaining -= time.milliseconds() - start;
-        }
+        coordinator.poll(timeout);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 4c2ac40..3bfa83f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -67,6 +67,7 @@ public class WorkerCoordinatorTest {
 
     private String groupId = "test-group";
     private int sessionTimeoutMs = 10;
+    private int rebalanceTimeoutMs = 60;
     private int heartbeatIntervalMs = 2;
     private long retryBackoffMs = 100;
     private MockTime time;
@@ -98,6 +99,7 @@ public class WorkerCoordinatorTest {
 
         this.coordinator = new WorkerCoordinator(consumerClient,
                 groupId,
+                rebalanceTimeoutMs,
                 sessionTimeoutMs,
                 heartbeatIntervalMs,
                 metrics,

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala 
b/core/src/main/scala/kafka/api/ApiVersion.scala
index 666d0e7..d955225 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -51,7 +51,10 @@ object ApiVersion {
     "0.10.0-IV0" -> KAFKA_0_10_0_IV0,
     // 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
     "0.10.0-IV1" -> KAFKA_0_10_0_IV1,
-    "0.10.0" -> KAFKA_0_10_0_IV1
+    "0.10.0" -> KAFKA_0_10_0_IV1,
+
+    // introduced for JoinGroup protocol change in KIP-62
+    "0.10.1-IV0" -> KAFKA_0_10_1_IV0
   )
 
   private val versionPattern = "\\.".r
@@ -111,3 +114,9 @@ case object KAFKA_0_10_0_IV1 extends ApiVersion {
   val messageFormatVersion: Byte = Message.MagicValue_V1
   val id: Int = 5
 }
+
+case object KAFKA_0_10_1_IV0 extends ApiVersion {
+  val version: String = "0.10.1-IV0"
+  val messageFormatVersion: Byte = Message.MagicValue_V1
+  val id: Int = 6
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 0d02a4c..726426a 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -93,6 +93,7 @@ class GroupCoordinator(val brokerId: Int,
                       memberId: String,
                       clientId: String,
                       clientHost: String,
+                      rebalanceTimeoutMs: Int,
                       sessionTimeoutMs: Int,
                       protocolType: String,
                       protocols: List[(String, Array[Byte])],
@@ -118,11 +119,11 @@ class GroupCoordinator(val brokerId: Int,
             responseCallback(joinError(memberId, 
Errors.UNKNOWN_MEMBER_ID.code))
           } else {
             val group = groupManager.addGroup(new GroupMetadata(groupId))
-            doJoinGroup(group, memberId, clientId, clientHost, 
sessionTimeoutMs, protocolType, protocols, responseCallback)
+            doJoinGroup(group, memberId, clientId, clientHost, 
rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
           }
 
         case Some(group) =>
-          doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, 
protocolType, protocols, responseCallback)
+          doJoinGroup(group, memberId, clientId, clientHost, 
rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
       }
     }
   }
@@ -131,6 +132,7 @@ class GroupCoordinator(val brokerId: Int,
                           memberId: String,
                           clientId: String,
                           clientHost: String,
+                          rebalanceTimeoutMs: Int,
                           sessionTimeoutMs: Int,
                           protocolType: String,
                           protocols: List[(String, Array[Byte])],
@@ -154,7 +156,7 @@ class GroupCoordinator(val brokerId: Int,
 
           case PreparingRebalance =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, 
protocolType, protocols, group, responseCallback)
+              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, 
clientId, clientHost, protocolType, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               updateMemberAndRebalance(group, member, protocols, 
responseCallback)
@@ -162,7 +164,7 @@ class GroupCoordinator(val brokerId: Int,
 
           case AwaitingSync =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, 
protocolType, protocols, group, responseCallback)
+              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, 
clientId, clientHost, protocolType, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               if (member.matches(protocols)) {
@@ -189,7 +191,7 @@ class GroupCoordinator(val brokerId: Int,
           case Empty | Stable =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
               // if the member id is unknown, register the member to the group
-              addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, 
protocolType, protocols, group, responseCallback)
+              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, 
clientId, clientHost, protocolType, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               if (memberId == group.leaderId || !member.matches(protocols)) {
@@ -256,7 +258,6 @@ class GroupCoordinator(val brokerId: Int,
 
           case AwaitingSync =>
             group.get(memberId).awaitingSyncCallback = responseCallback
-            completeAndScheduleNextHeartbeatExpiration(group, 
group.get(memberId))
 
             // if this is the leader, then we can attempt to persist state and 
transition to stable
             if (memberId == group.leaderId) {
@@ -299,7 +300,7 @@ class GroupCoordinator(val brokerId: Int,
     delayedGroupStore.foreach(groupManager.store)
   }
 
-  def handleLeaveGroup(groupId: String, consumerId: String, responseCallback: 
Short => Unit) {
+  def handleLeaveGroup(groupId: String, memberId: String, responseCallback: 
Short => Unit) {
     if (!isActive.get) {
       responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
     } else if (!isCoordinatorForGroup(groupId)) {
@@ -317,10 +318,10 @@ class GroupCoordinator(val brokerId: Int,
 
         case Some(group) =>
           group synchronized {
-            if (group.is(Dead) || !group.has(consumerId)) {
+            if (group.is(Dead) || !group.has(memberId)) {
               responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
             } else {
-              val member = group.get(consumerId)
+              val member = group.get(memberId)
               removeHeartbeatForLeavingMember(group, member)
               onMemberFailure(group, member)
               responseCallback(Errors.NONE.code)
@@ -343,27 +344,49 @@ class GroupCoordinator(val brokerId: Int,
       responseCallback(Errors.NONE.code)
     } else {
       groupManager.getGroup(groupId) match {
-        case None => responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+        case None =>
+          responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+
         case Some(group) =>
           group synchronized {
-            if (group.is(Empty)) {
-              responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
-            } else if (group.is(Dead)) {
-              // if the group is marked as dead, it means some other thread 
has just removed the group
-              // from the coordinator metadata; this is likely that the group 
has migrated to some other
-              // coordinator OR the group is in a transient unstable phase. 
Let the member retry
-              // joining without the specified member id,
-              responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
-            } else if (!group.is(Stable)) {
-              responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
-            } else if (!group.has(memberId)) {
-              responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
-            } else if (generationId != group.generationId) {
-              responseCallback(Errors.ILLEGAL_GENERATION.code)
-            } else {
-              val member = group.get(memberId)
-              completeAndScheduleNextHeartbeatExpiration(group, member)
-              responseCallback(Errors.NONE.code)
+            group.currentState match {
+              case Dead =>
+                // if the group is marked as dead, it means some other thread 
has just removed the group
+                // from the coordinator metadata; this is likely that the 
group has migrated to some other
+                // coordinator OR the group is in a transient unstable phase. 
Let the member retry
+                // joining without the specified member id,
+                responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+
+              case Empty =>
+                responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+
+              case AwaitingSync =>
+                if (!group.has(memberId))
+                  responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+                else
+                  responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
+
+              case PreparingRebalance =>
+                if (!group.has(memberId)) {
+                  responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+                } else if (generationId != group.generationId) {
+                  responseCallback(Errors.ILLEGAL_GENERATION.code)
+                } else {
+                  val member = group.get(memberId)
+                  completeAndScheduleNextHeartbeatExpiration(group, member)
+                  responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
+                }
+
+              case Stable =>
+                if (!group.has(memberId)) {
+                  responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+                } else if (generationId != group.generationId) {
+                  responseCallback(Errors.ILLEGAL_GENERATION.code)
+                } else {
+                  val member = group.get(memberId)
+                  completeAndScheduleNextHeartbeatExpiration(group, member)
+                  responseCallback(Errors.NONE.code)
+                }
             }
           }
       }
@@ -585,7 +608,8 @@ class GroupCoordinator(val brokerId: Int,
     heartbeatPurgatory.checkAndComplete(memberKey)
   }
 
-  private def addMemberAndRebalance(sessionTimeoutMs: Int,
+  private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
+                                    sessionTimeoutMs: Int,
                                     clientId: String,
                                     clientHost: String,
                                     protocolType: String,
@@ -594,7 +618,8 @@ class GroupCoordinator(val brokerId: Int,
                                     callback: JoinCallback) = {
     // use the client-id with a random id suffix as the member-id
     val memberId = clientId + "-" + group.generateMemberIdSuffix
-    val member = new MemberMetadata(memberId, group.groupId, clientId, 
clientHost, sessionTimeoutMs, protocolType, protocols)
+    val member = new MemberMetadata(memberId, group.groupId, clientId, 
clientHost, rebalanceTimeoutMs,
+      sessionTimeoutMs, protocolType, protocols)
     member.awaitingJoinCallback = callback
     group.add(member.memberId, member)
     maybePrepareRebalance(group)
@@ -625,7 +650,7 @@ class GroupCoordinator(val brokerId: Int,
     group.transitionTo(PreparingRebalance)
     info("Preparing to restabilize group %s with old generation 
%s".format(group.groupId, group.generationId))
 
-    val rebalanceTimeout = group.rebalanceTimeout
+    val rebalanceTimeout = group.rebalanceTimeoutMs
     val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
     val groupKey = GroupKey(group.groupId)
     joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
@@ -770,7 +795,8 @@ object GroupCoordinator {
     val groupConfig = GroupConfig(groupMinSessionTimeoutMs = 
config.groupMinSessionTimeoutMs,
       groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
 
-    val groupMetadataManager = new GroupMetadataManager(config.brokerId, 
offsetConfig, replicaManager, zkUtils, time)
+    val groupMetadataManager = new GroupMetadataManager(config.brokerId, 
config.interBrokerProtocolVersion,
+      offsetConfig, replicaManager, zkUtils, time)
     new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, 
groupMetadataManager, heartbeatPurgatory, joinPurgatory, time)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala 
b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
index b455964..c86c7f8 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
@@ -190,8 +190,8 @@ private[coordinator] class GroupMetadata(val groupId: 
String, initialState: Grou
 
   def allMemberMetadata = members.values.toList
 
-  def rebalanceTimeout = members.values.foldLeft(0) {(timeout, member) =>
-    timeout.max(member.sessionTimeoutMs)
+  def rebalanceTimeoutMs = members.values.foldLeft(0) { (timeout, member) =>
+    timeout.max(member.rebalanceTimeoutMs)
   }
 
   // TODO: decide if ids should be predictable or random

http://git-wip-us.apache.org/repos/asf/kafka/blob/40b1dd3f/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index ef8b295..cf8ae91 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -47,10 +47,12 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
 
 import com.yammer.metrics.core.Gauge
+import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0}
 import kafka.utils.CoreUtils.inLock
 
 
 class GroupMetadataManager(val brokerId: Int,
+                           val interBrokerProtocolVersion: ApiVersion,
                            val config: OffsetConfig,
                            replicaManager: ReplicaManager,
                            zkUtils: ZkUtils,
@@ -175,9 +177,11 @@ class GroupMetadataManager(val brokerId: Int,
                         groupAssignment: Map[String, Array[Byte]],
                         responseCallback: Short => Unit): DelayedStore = {
     val (magicValue, timestamp) = 
getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
+    val groupMetadataValueVersion = if (interBrokerProtocolVersion < 
KAFKA_0_10_1_IV0) 0.toShort else 
GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
+
     val message = new Message(
       key = GroupMetadataManager.groupMetadataKey(group.groupId),
-      bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment),
+      bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment, 
version = groupMetadataValueVersion),
       timestamp = timestamp,
       magicValue = magicValue)
 
@@ -704,30 +708,51 @@ object GroupMetadataManager {
   private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", 
STRING))
   private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
 
-  private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING),
-    new Field("client_id", STRING),
-    new Field("client_host", STRING),
-    new Field("session_timeout", INT32),
-    new Field("subscription", BYTES),
-    new Field("assignment", BYTES))
-  private val MEMBER_METADATA_MEMBER_ID_V0 = 
MEMBER_METADATA_V0.get("member_id")
-  private val MEMBER_METADATA_CLIENT_ID_V0 = 
MEMBER_METADATA_V0.get("client_id")
-  private val MEMBER_METADATA_CLIENT_HOST_V0 = 
MEMBER_METADATA_V0.get("client_host")
-  private val MEMBER_METADATA_SESSION_TIMEOUT_V0 = 
MEMBER_METADATA_V0.get("session_timeout")
-  private val MEMBER_METADATA_SUBSCRIPTION_V0 = 
MEMBER_METADATA_V0.get("subscription")
-  private val MEMBER_METADATA_ASSIGNMENT_V0 = 
MEMBER_METADATA_V0.get("assignment")
-
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(new 
Field("protocol_type", STRING),
-    new Field("generation", INT32),
-    new Field("protocol", NULLABLE_STRING),
-    new Field("leader", NULLABLE_STRING),
-    new Field("members", new ArrayOf(MEMBER_METADATA_V0)))
-  private val GROUP_METADATA_PROTOCOL_TYPE_V0 = 
GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol_type")
-  private val GROUP_METADATA_GENERATION_V0 = 
GROUP_METADATA_VALUE_SCHEMA_V0.get("generation")
-  private val GROUP_METADATA_PROTOCOL_V0 = 
GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol")
-  private val GROUP_METADATA_LEADER_V0 = 
GROUP_METADATA_VALUE_SCHEMA_V0.get("leader")
-  private val GROUP_METADATA_MEMBERS_V0 = 
GROUP_METADATA_VALUE_SCHEMA_V0.get("members")
+  private val MEMBER_ID_KEY = "member_id"
+  private val CLIENT_ID_KEY = "client_id"
+  private val CLIENT_HOST_KEY = "client_host"
+  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
+  private val SESSION_TIMEOUT_KEY = "session_timeout"
+  private val SUBSCRIPTION_KEY = "subscription"
+  private val ASSIGNMENT_KEY = "assignment"
+
+  private val MEMBER_METADATA_V0 = new Schema(
+    new Field(MEMBER_ID_KEY, STRING),
+    new Field(CLIENT_ID_KEY, STRING),
+    new Field(CLIENT_HOST_KEY, STRING),
+    new Field(SESSION_TIMEOUT_KEY, INT32),
+    new Field(SUBSCRIPTION_KEY, BYTES),
+    new Field(ASSIGNMENT_KEY, BYTES))
+
+  private val MEMBER_METADATA_V1 = new Schema(
+    new Field(MEMBER_ID_KEY, STRING),
+    new Field(CLIENT_ID_KEY, STRING),
+    new Field(CLIENT_HOST_KEY, STRING),
+    new Field(REBALANCE_TIMEOUT_KEY, INT32),
+    new Field(SESSION_TIMEOUT_KEY, INT32),
+    new Field(SUBSCRIPTION_KEY, BYTES),
+    new Field(ASSIGNMENT_KEY, BYTES))
+
+  private val PROTOCOL_TYPE_KEY = "protocol_type"
+  private val GENERATION_KEY = "generation"
+  private val PROTOCOL_KEY = "protocol"
+  private val LEADER_KEY = "leader"
+  private val MEMBERS_KEY = "members"
+
+  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
+    new Field(PROTOCOL_TYPE_KEY, STRING),
+    new Field(GENERATION_KEY, INT32),
+    new Field(PROTOCOL_KEY, NULLABLE_STRING),
+    new Field(LEADER_KEY, NULLABLE_STRING),
+    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
+
+  private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
+    new Field(PROTOCOL_TYPE_KEY, STRING),
+    new Field(GENERATION_KEY, INT32),
+    new Field(PROTOCOL_KEY, NULLABLE_STRING),
+    new Field(LEADER_KEY, NULLABLE_STRING),
+    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
+
 
   // map of versions to key schemas as data types
   private val MESSAGE_TYPE_SCHEMAS = Map(
@@ -742,8 +767,10 @@ object GroupMetadataManager {
   private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort
 
   // map of version of group metadata value schemas
-  private val GROUP_VALUE_SCHEMAS = Map(0 -> GROUP_METADATA_VALUE_SCHEMA_V0)
-  private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 0.toShort
+  private val GROUP_VALUE_SCHEMAS = Map(
+    0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
+    1 -> GROUP_METADATA_VALUE_SCHEMA_V1)
+  private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort
 
   private val CURRENT_OFFSET_KEY_SCHEMA = 
schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
   private val CURRENT_GROUP_KEY_SCHEMA = 
schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
@@ -830,40 +857,47 @@ object GroupMetadataManager {
    * Generates the payload for group metadata message from given offset and 
metadata
    * assuming the generation id, selected protocol, leader and member 
assignment are all available
    *
-   * @param groupMetadata
+   * @param groupMetadata current group metadata
+   * @param assignment the assignment for the rebalancing generation
+   * @param version the version of the value message to use
    * @return payload for offset commit message
    */
-  def groupMetadataValue(groupMetadata: GroupMetadata, assignment: Map[String, 
Array[Byte]]): Array[Byte] = {
-    // generate commit value with schema version 1
-    val value = new Struct(CURRENT_GROUP_VALUE_SCHEMA)
-    value.set(GROUP_METADATA_PROTOCOL_TYPE_V0, 
groupMetadata.protocolType.getOrElse(""))
-    value.set(GROUP_METADATA_GENERATION_V0, groupMetadata.generationId)
-    value.set(GROUP_METADATA_PROTOCOL_V0, groupMetadata.protocol)
-    value.set(GROUP_METADATA_LEADER_V0, groupMetadata.leaderId)
+  def groupMetadataValue(groupMetadata: GroupMetadata,
+                         assignment: Map[String, Array[Byte]],
+                         version: Short = 0): Array[Byte] = {
+    val value = if (version == 0) new Struct(GROUP_METADATA_VALUE_SCHEMA_V0) 
else new Struct(CURRENT_GROUP_VALUE_SCHEMA)
+
+    value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
+    value.set(GENERATION_KEY, groupMetadata.generationId)
+    value.set(PROTOCOL_KEY, groupMetadata.protocol)
+    value.set(LEADER_KEY, groupMetadata.leaderId)
 
     val memberArray = groupMetadata.allMemberMetadata.map {
       case memberMetadata =>
-        val memberStruct = value.instance(GROUP_METADATA_MEMBERS_V0)
-        memberStruct.set(MEMBER_METADATA_MEMBER_ID_V0, memberMetadata.memberId)
-        memberStruct.set(MEMBER_METADATA_CLIENT_ID_V0, memberMetadata.clientId)
-        memberStruct.set(MEMBER_METADATA_CLIENT_HOST_V0, 
memberMetadata.clientHost)
-        memberStruct.set(MEMBER_METADATA_SESSION_TIMEOUT_V0, 
memberMetadata.sessionTimeoutMs)
+        val memberStruct = value.instance(MEMBERS_KEY)
+        memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
+        memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
+        memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
+        memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
+
+        if (version > 0)
+          memberStruct.set(REBALANCE_TIMEOUT_KEY, 
memberMetadata.rebalanceTimeoutMs)
 
         val metadata = memberMetadata.metadata(groupMetadata.protocol)
-        memberStruct.set(MEMBER_METADATA_SUBSCRIPTION_V0, 
ByteBuffer.wrap(metadata))
+        memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
 
         val memberAssignment = assignment(memberMetadata.memberId)
         assert(memberAssignment != null)
 
-        memberStruct.set(MEMBER_METADATA_ASSIGNMENT_V0, 
ByteBuffer.wrap(memberAssignment))
+        memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment))
 
         memberStruct
     }
 
-    value.set(GROUP_METADATA_MEMBERS_V0, memberArray.toArray)
+    value.set(MEMBERS_KEY, memberArray.toArray)
 
     val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
-    byteBuffer.putShort(CURRENT_GROUP_VALUE_SCHEMA_VERSION)
+    byteBuffer.putShort(version)
     value.writeTo(byteBuffer)
     byteBuffer.array()
   }
@@ -944,31 +978,33 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroup(version)
       val value = valueSchema.read(buffer)
 
-      if (version == 0) {
-        val protocolType = 
value.get(GROUP_METADATA_PROTOCOL_TYPE_V0).asInstanceOf[String]
+      if (version == 0 || version == 1) {
+        val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
 
-        val memberMetadataArray = value.getArray(GROUP_METADATA_MEMBERS_V0)
+        val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
 
         val group = new GroupMetadata(groupId, initialState)
 
-        group.generationId = 
value.get(GROUP_METADATA_GENERATION_V0).asInstanceOf[Int]
-        group.leaderId = 
value.get(GROUP_METADATA_LEADER_V0).asInstanceOf[String]
-        group.protocol = 
value.get(GROUP_METADATA_PROTOCOL_V0).asInstanceOf[String]
+        group.generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
+        group.leaderId = value.get(LEADER_KEY).asInstanceOf[String]
+        group.protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
 
         memberMetadataArray.foreach {
           case memberMetadataObj =>
             val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
-            val memberId = 
memberMetadata.get(MEMBER_METADATA_MEMBER_ID_V0).asInstanceOf[String]
-            val clientId = 
memberMetadata.get(MEMBER_METADATA_CLIENT_ID_V0).asInstanceOf[String]
-            val clientHost = 
memberMetadata.get(MEMBER_METADATA_CLIENT_HOST_V0).asInstanceOf[String]
-            val sessionTimeout = 
memberMetadata.get(MEMBER_METADATA_SESSION_TIMEOUT_V0).asInstanceOf[Int]
-            val subscription = 
Utils.toArray(memberMetadata.get(MEMBER_METADATA_SUBSCRIPTION_V0).asInstanceOf[ByteBuffer])
+            val memberId = 
memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
+            val clientId = 
memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
+            val clientHost = 
memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
+            val sessionTimeout = 
memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int]
+            val rebalanceTimeout = if (version == 0) sessionTimeout else 
memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int]
+
+            val subscription = 
Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
 
-            val member = new MemberMetadata(memberId, groupId, clientId, 
clientHost, sessionTimeout,
+            val member = new MemberMetadata(memberId, groupId, clientId, 
clientHost, rebalanceTimeout, sessionTimeout,
               protocolType, List((group.protocol, subscription)))
 
-            member.assignment = 
Utils.toArray(memberMetadata.get(MEMBER_METADATA_ASSIGNMENT_V0).asInstanceOf[ByteBuffer])
+            member.assignment = 
Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
 
             group.add(memberId, member)
         }

Reply via email to