http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java new file mode 100644 index 0000000..93994d7 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -0,0 +1,749 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.OffsetMetadataTooLarge; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.GroupMetadataResponse; +import org.apache.kafka.common.requests.HeartbeatResponse; +import org.apache.kafka.common.requests.JoinGroupResponse; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.OffsetCommitResponse; +import org.apache.kafka.common.requests.OffsetFetchResponse; +import org.apache.kafka.common.requests.SyncGroupRequest; +import org.apache.kafka.common.requests.SyncGroupResponse; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class ConsumerCoordinatorTest { + + private String topicName = "test"; + private String groupId = "test-group"; + private TopicPartition tp = new TopicPartition(topicName, 0); + private int sessionTimeoutMs = 10; + private int heartbeatIntervalMs = 2; + private long retryBackoffMs = 100; + private long requestTimeoutMs = 5000; + private boolean autoCommitEnabled = false; + private long autoCommitIntervalMs = 5000; + private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor(); + private List<PartitionAssignor> assignors = Arrays.<PartitionAssignor>asList(partitionAssignor); + private MockTime time; + private MockClient client; + private Cluster cluster = TestUtils.singletonCluster(topicName, 1); + private Node node = cluster.nodes().get(0); + private SubscriptionState subscriptions; + private Metadata metadata; + private Metrics metrics; + private Map<String, String> metricTags = new LinkedHashMap<>(); + private ConsumerNetworkClient consumerClient; + private MockRebalanceListener rebalanceListener; + private MockCommitCallback defaultOffsetCommitCallback; + private ConsumerCoordinator coordinator; + + @Before + public void setup() { + this.time = new MockTime(); + this.client = new MockClient(time); + this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); + this.metadata = new Metadata(0, Long.MAX_VALUE); + this.metadata.update(cluster, time.milliseconds()); + this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100); + this.metrics = new Metrics(time); + this.rebalanceListener = new MockRebalanceListener(); + this.defaultOffsetCommitCallback = new MockCommitCallback(); + this.partitionAssignor.clear(); + + client.setNode(node); + + this.coordinator = new ConsumerCoordinator( + consumerClient, + groupId, + sessionTimeoutMs, + heartbeatIntervalMs, + assignors, + metadata, + subscriptions, + metrics, + "consumer" + groupId, + metricTags, + time, + requestTimeoutMs, + retryBackoffMs, + defaultOffsetCommitCallback, + autoCommitEnabled, + autoCommitIntervalMs); + } + + @After + public void teardown() { + this.metrics.close(); + } + + @Test + public void testNormalHeartbeat() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // normal heartbeat + time.sleep(sessionTimeoutMs); + RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat + assertEquals(1, consumerClient.pendingRequestCount()); + assertFalse(future.isDone()); + + client.prepareResponse(heartbeatResponse(Errors.NONE.code())); + consumerClient.poll(0); + + assertTrue(future.isDone()); + assertTrue(future.succeeded()); + } + + @Test + public void testCoordinatorNotAvailable() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // GROUP_COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown + time.sleep(sessionTimeoutMs); + RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat + assertEquals(1, consumerClient.pendingRequestCount()); + assertFalse(future.isDone()); + + client.prepareResponse(heartbeatResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())); + time.sleep(sessionTimeoutMs); + consumerClient.poll(0); + + assertTrue(future.isDone()); + assertTrue(future.failed()); + assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), future.exception()); + assertTrue(coordinator.coordinatorUnknown()); + } + + @Test + public void testNotCoordinator() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // not_coordinator will mark coordinator as unknown + time.sleep(sessionTimeoutMs); + RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat + assertEquals(1, consumerClient.pendingRequestCount()); + assertFalse(future.isDone()); + + client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_GROUP.code())); + time.sleep(sessionTimeoutMs); + consumerClient.poll(0); + + assertTrue(future.isDone()); + assertTrue(future.failed()); + assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.exception(), future.exception()); + assertTrue(coordinator.coordinatorUnknown()); + } + + @Test + public void testIllegalGeneration() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // illegal_generation will cause re-partition + subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.changePartitionAssignment(Collections.singletonList(tp)); + + time.sleep(sessionTimeoutMs); + RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat + assertEquals(1, consumerClient.pendingRequestCount()); + assertFalse(future.isDone()); + + client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION.code())); + time.sleep(sessionTimeoutMs); + consumerClient.poll(0); + + assertTrue(future.isDone()); + assertTrue(future.failed()); + assertEquals(Errors.ILLEGAL_GENERATION.exception(), future.exception()); + assertTrue(coordinator.needRejoin()); + } + + @Test + public void testUnknownConsumerId() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // illegal_generation will cause re-partition + subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.changePartitionAssignment(Collections.singletonList(tp)); + + time.sleep(sessionTimeoutMs); + RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat + assertEquals(1, consumerClient.pendingRequestCount()); + assertFalse(future.isDone()); + + client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID.code())); + time.sleep(sessionTimeoutMs); + consumerClient.poll(0); + + assertTrue(future.isDone()); + assertTrue(future.failed()); + assertEquals(Errors.UNKNOWN_MEMBER_ID.exception(), future.exception()); + assertTrue(coordinator.needRejoin()); + } + + @Test + public void testCoordinatorDisconnect() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // coordinator disconnect will mark coordinator as unknown + time.sleep(sessionTimeoutMs); + RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat + assertEquals(1, consumerClient.pendingRequestCount()); + assertFalse(future.isDone()); + + client.prepareResponse(heartbeatResponse(Errors.NONE.code()), true); // return disconnected + time.sleep(sessionTimeoutMs); + consumerClient.poll(0); + + assertTrue(future.isDone()); + assertTrue(future.failed()); + assertTrue(future.exception() instanceof DisconnectException); + assertTrue(coordinator.coordinatorUnknown()); + } + + @Test + public void testNormalJoinGroupLeader() { + final String consumerId = "leader"; + + subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.needReassignment(); + + // ensure metadata is up-to-date for leader + metadata.setTopics(Arrays.asList(topicName)); + metadata.update(cluster, time.milliseconds()); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // normal join group + Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName)); + partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp))); + + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code())); + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + SyncGroupRequest sync = new SyncGroupRequest(request.request().body()); + return sync.memberId().equals(consumerId) && + sync.generationId() == 1 && + sync.groupAssignment().containsKey(consumerId); + } + }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + coordinator.ensurePartitionAssignment(); + + assertFalse(subscriptions.partitionAssignmentNeeded()); + assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions()); + assertEquals(1, rebalanceListener.revokedCount); + assertEquals(Collections.emptySet(), rebalanceListener.revoked); + assertEquals(1, rebalanceListener.assignedCount); + assertEquals(Collections.singleton(tp), rebalanceListener.assigned); + } + + @Test + public void testNormalJoinGroupFollower() { + final String consumerId = "consumer"; + + subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.needReassignment(); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // normal join group + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + SyncGroupRequest sync = new SyncGroupRequest(request.request().body()); + return sync.memberId().equals(consumerId) && + sync.generationId() == 1 && + sync.groupAssignment().isEmpty(); + } + }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + + coordinator.ensurePartitionAssignment(); + + assertFalse(subscriptions.partitionAssignmentNeeded()); + assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions()); + assertEquals(1, rebalanceListener.revokedCount); + assertEquals(1, rebalanceListener.assignedCount); + assertEquals(Collections.singleton(tp), rebalanceListener.assigned); + } + + + @Test + public void testMetadataChangeTriggersRebalance() { + final String consumerId = "consumer"; + + subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.needReassignment(); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + + coordinator.ensurePartitionAssignment(); + + assertFalse(subscriptions.partitionAssignmentNeeded()); + + // a new partition is added to the topic + metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds()); + + // we should detect the change and ask for reassignment + assertTrue(subscriptions.partitionAssignmentNeeded()); + } + + @Test + public void testRejoinGroup() { + subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.needReassignment(); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // join the group once + client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + coordinator.ensurePartitionAssignment(); + + assertEquals(1, rebalanceListener.revokedCount); + assertEquals(1, rebalanceListener.assignedCount); + + // and join the group again + subscriptions.needReassignment(); + client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + coordinator.ensurePartitionAssignment(); + + assertEquals(2, rebalanceListener.revokedCount); + assertEquals(Collections.singleton(tp), rebalanceListener.revoked); + assertEquals(2, rebalanceListener.assignedCount); + assertEquals(Collections.singleton(tp), rebalanceListener.assigned); + } + + @Test + public void testDisconnectInJoin() { + subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.needReassignment(); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // disconnected from original coordinator will cause re-discover and join again + client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()), true); + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + coordinator.ensurePartitionAssignment(); + assertFalse(subscriptions.partitionAssignmentNeeded()); + assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions()); + assertEquals(1, rebalanceListener.revokedCount); + assertEquals(1, rebalanceListener.assignedCount); + assertEquals(Collections.singleton(tp), rebalanceListener.assigned); + } + + @Test(expected = ApiException.class) + public void testInvalidSessionTimeout() { + subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.needReassignment(); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // coordinator doesn't like the session timeout + client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT.code())); + coordinator.ensurePartitionAssignment(); + } + + @Test + public void testCommitOffsetOnly() { + subscriptions.assign(Arrays.asList(tp)); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); + + AtomicBoolean success = new AtomicBoolean(false); + coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success)); + consumerClient.poll(0); + assertTrue(success.get()); + + assertEquals(100L, subscriptions.committed(tp).offset()); + } + + @Test + public void testCommitOffsetMetadata() { + subscriptions.assign(Arrays.asList(tp)); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); + + AtomicBoolean success = new AtomicBoolean(false); + coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "hello")), callback(success)); + consumerClient.poll(0); + assertTrue(success.get()); + + assertEquals(100L, subscriptions.committed(tp).offset()); + assertEquals("hello", subscriptions.committed(tp).metadata()); + } + + @Test + public void testCommitOffsetAsyncWithDefaultCallback() { + int invokedBeforeTest = defaultOffsetCommitCallback.invoked; + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); + coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null); + consumerClient.poll(0); + assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked); + assertNull(defaultOffsetCommitCallback.exception); + } + + @Test + public void testResetGeneration() { + // enable auto-assignment + subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + coordinator.ensurePartitionAssignment(); + + // now switch to manual assignment + subscriptions.unsubscribe(); + coordinator.resetGeneration(); + subscriptions.assign(Arrays.asList(tp)); + + // the client should not reuse generation/memberId from auto-subscribed generation + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body()); + return commitRequest.memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) && + commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID; + } + }, offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); + + AtomicBoolean success = new AtomicBoolean(false); + coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success)); + consumerClient.poll(0); + assertTrue(success.get()); + } + + @Test + public void testCommitOffsetAsyncFailedWithDefaultCallback() { + int invokedBeforeTest = defaultOffsetCommitCallback.invoked; + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()))); + coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null); + consumerClient.poll(0); + assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked); + assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), defaultOffsetCommitCallback.exception); + } + + @Test + public void testCommitOffsetAsyncCoordinatorNotAvailable() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // async commit with coordinator not available + MockCommitCallback cb = new MockCommitCallback(); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()))); + coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb); + consumerClient.poll(0); + + assertTrue(coordinator.coordinatorUnknown()); + assertEquals(1, cb.invoked); + assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), cb.exception); + } + + @Test + public void testCommitOffsetAsyncNotCoordinator() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // async commit with not coordinator + MockCommitCallback cb = new MockCommitCallback(); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code()))); + coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb); + consumerClient.poll(0); + + assertTrue(coordinator.coordinatorUnknown()); + assertEquals(1, cb.invoked); + assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.exception(), cb.exception); + } + + @Test + public void testCommitOffsetAsyncDisconnected() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // async commit with coordinator disconnected + MockCommitCallback cb = new MockCommitCallback(); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true); + coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb); + consumerClient.poll(0); + + assertTrue(coordinator.coordinatorUnknown()); + assertEquals(1, cb.invoked); + assertTrue(cb.exception instanceof DisconnectException); + } + + @Test + public void testCommitOffsetSyncNotCoordinator() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request) + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code()))); + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); + coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L))); + } + + @Test + public void testCommitOffsetSyncCoordinatorNotAvailable() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request) + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()))); + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); + coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L))); + } + + @Test + public void testCommitOffsetSyncCoordinatorDisconnected() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request) + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true); + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); + coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L))); + } + + @Test(expected = OffsetMetadataTooLarge.class) + public void testCommitOffsetMetadataTooLarge() { + // since offset metadata is provided by the user, we have to propagate the exception so they can handle it + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.OFFSET_METADATA_TOO_LARGE.code()))); + coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata"))); + } + + @Test(expected = ApiException.class) + public void testCommitOffsetSyncCallbackWithNonRetriableException() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // sync commit with invalid partitions should throw if we have no callback + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN.code())), false); + coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L))); + } + + @Test + public void testRefreshOffset() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + subscriptions.assign(Arrays.asList(tp)); + subscriptions.needRefreshCommits(); + client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L)); + coordinator.refreshCommittedOffsetsIfNeeded(); + assertFalse(subscriptions.refreshCommitsNeeded()); + assertEquals(100L, subscriptions.committed(tp).offset()); + } + + @Test + public void testRefreshOffsetLoadInProgress() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + subscriptions.assign(Arrays.asList(tp)); + subscriptions.needRefreshCommits(); + client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L)); + client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L)); + coordinator.refreshCommittedOffsetsIfNeeded(); + assertFalse(subscriptions.refreshCommitsNeeded()); + assertEquals(100L, subscriptions.committed(tp).offset()); + } + + @Test + public void testRefreshOffsetNotCoordinatorForConsumer() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + subscriptions.assign(Arrays.asList(tp)); + subscriptions.needRefreshCommits(); + client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code(), "", 100L)); + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L)); + coordinator.refreshCommittedOffsetsIfNeeded(); + assertFalse(subscriptions.refreshCommitsNeeded()); + assertEquals(100L, subscriptions.committed(tp).offset()); + } + + @Test + public void testRefreshOffsetWithNoFetchableOffsets() { + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + subscriptions.assign(Arrays.asList(tp)); + subscriptions.needRefreshCommits(); + client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L)); + coordinator.refreshCommittedOffsetsIfNeeded(); + assertFalse(subscriptions.refreshCommitsNeeded()); + assertEquals(null, subscriptions.committed(tp)); + } + + private Struct consumerMetadataResponse(Node node, short error) { + GroupMetadataResponse response = new GroupMetadataResponse(error, node); + return response.toStruct(); + } + + private Struct heartbeatResponse(short error) { + HeartbeatResponse response = new HeartbeatResponse(error); + return response.toStruct(); + } + + private Struct joinGroupLeaderResponse(int generationId, String memberId, + Map<String, List<String>> subscriptions, + short error) { + Map<String, ByteBuffer> metadata = new HashMap<>(); + for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) { + PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(subscriptionEntry.getValue()); + ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription); + metadata.put(subscriptionEntry.getKey(), buf); + } + return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, memberId, metadata).toStruct(); + } + + private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) { + return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, leaderId, + Collections.<String, ByteBuffer>emptyMap()).toStruct(); + } + + private Struct syncGroupResponse(List<TopicPartition> partitions, short error) { + ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions)); + return new SyncGroupResponse(error, buf).toStruct(); + } + + private Struct offsetCommitResponse(Map<TopicPartition, Short> responseData) { + OffsetCommitResponse response = new OffsetCommitResponse(responseData); + return response.toStruct(); + } + + private Struct offsetFetchResponse(TopicPartition tp, Short error, String metadata, long offset) { + OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, error); + OffsetFetchResponse response = new OffsetFetchResponse(Collections.singletonMap(tp, data)); + return response.toStruct(); + } + + private OffsetCommitCallback callback(final AtomicBoolean success) { + return new OffsetCommitCallback() { + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { + if (exception == null) + success.set(true); + } + }; + } + + private static class MockCommitCallback implements OffsetCommitCallback { + public int invoked = 0; + public Exception exception = null; + + @Override + public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { + invoked++; + this.exception = exception; + } + } + + private static class MockRebalanceListener implements ConsumerRebalanceListener { + public Collection<TopicPartition> revoked; + public Collection<TopicPartition> assigned; + public int revokedCount = 0; + public int assignedCount = 0; + + + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + this.assigned = partitions; + assignedCount++; + } + + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + this.revoked = partitions; + revokedCount++; + } + + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java index 9de1cee..6a42058 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java @@ -106,7 +106,7 @@ public class ConsumerNetworkClientTest { private HeartbeatRequest heartbeatRequest() { - return new HeartbeatRequest("group", 1, "consumerId"); + return new HeartbeatRequest("group", 1, "memberId"); } private Struct heartbeatResponse(short error) { http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java new file mode 100644 index 0000000..8113770 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class ConsumerProtocolTest { + + @Test + public void serializeDeserializeMetadata() { + Subscription subscription = new Subscription(Arrays.asList("foo", "bar")); + + ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription); + Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer); + assertEquals(subscription.topics(), parsedSubscription.topics()); + } + + @Test + public void deserializeNewSubscriptionVersion() { + // verify that a new version which adds a field is still parseable + short version = 100; + + Schema subscriptionSchemaV100 = new Schema( + new Field(ConsumerProtocol.TOPICS_KEY_NAME, new ArrayOf(Type.STRING)), + new Field(ConsumerProtocol.USER_DATA_KEY_NAME, Type.BYTES), + new Field("foo", Type.STRING)); + + Struct subscriptionV100 = new Struct(subscriptionSchemaV100); + subscriptionV100.set(ConsumerProtocol.TOPICS_KEY_NAME, new Object[]{"topic"}); + subscriptionV100.set(ConsumerProtocol.USER_DATA_KEY_NAME, ByteBuffer.wrap(new byte[0])); + subscriptionV100.set("foo", "bar"); + + Struct headerV100 = new Struct(ConsumerProtocol.CONSUMER_PROTOCOL_HEADER_SCHEMA); + headerV100.set(ConsumerProtocol.VERSION_KEY_NAME, version); + + ByteBuffer buffer = ByteBuffer.allocate(subscriptionV100.sizeOf() + headerV100.sizeOf()); + headerV100.writeTo(buffer); + subscriptionV100.writeTo(buffer); + + buffer.flip(); + + Subscription subscription = ConsumerProtocol.deserializeSubscription(buffer); + assertEquals(Arrays.asList("topic"), subscription.topics()); + } + + @Test + public void serializeDeserializeAssignment() { + List<TopicPartition> partitions = Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("bar", 2)); + ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions)); + PartitionAssignor.Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer); + assertEquals(toSet(partitions), toSet(parsedAssignment.partitions())); + } + + @Test + public void deserializeNewAssignmentVersion() { + // verify that a new version which adds a field is still parseable + short version = 100; + + Schema assignmentSchemaV100 = new Schema( + new Field(ConsumerProtocol.TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(ConsumerProtocol.TOPIC_ASSIGNMENT_V0)), + new Field(ConsumerProtocol.USER_DATA_KEY_NAME, Type.BYTES), + new Field("foo", Type.STRING)); + + Struct assignmentV100 = new Struct(assignmentSchemaV100); + assignmentV100.set(ConsumerProtocol.TOPIC_PARTITIONS_KEY_NAME, + new Object[]{new Struct(ConsumerProtocol.TOPIC_ASSIGNMENT_V0) + .set(ConsumerProtocol.TOPIC_KEY_NAME, "foo") + .set(ConsumerProtocol.PARTITIONS_KEY_NAME, new Object[]{1})}); + assignmentV100.set(ConsumerProtocol.USER_DATA_KEY_NAME, ByteBuffer.wrap(new byte[0])); + assignmentV100.set("foo", "bar"); + + Struct headerV100 = new Struct(ConsumerProtocol.CONSUMER_PROTOCOL_HEADER_SCHEMA); + headerV100.set(ConsumerProtocol.VERSION_KEY_NAME, version); + + ByteBuffer buffer = ByteBuffer.allocate(assignmentV100.sizeOf() + headerV100.sizeOf()); + headerV100.writeTo(buffer); + assignmentV100.writeTo(buffer); + + buffer.flip(); + + PartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer); + assertEquals(toSet(Arrays.asList(new TopicPartition("foo", 1))), toSet(assignment.partitions())); + } + + private static <T> Set<T> toSet(Collection<T> collection) { + return new HashSet<>(collection); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java deleted file mode 100644 index 66b2e32..0000000 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java +++ /dev/null @@ -1,635 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import org.apache.kafka.clients.ClientRequest; -import org.apache.kafka.clients.Metadata; -import org.apache.kafka.clients.MockClient; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.ApiException; -import org.apache.kafka.common.errors.DisconnectException; -import org.apache.kafka.common.errors.OffsetMetadataTooLarge; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.requests.ConsumerMetadataResponse; -import org.apache.kafka.common.requests.HeartbeatResponse; -import org.apache.kafka.common.requests.JoinGroupResponse; -import org.apache.kafka.common.requests.OffsetCommitRequest; -import org.apache.kafka.common.requests.OffsetCommitResponse; -import org.apache.kafka.common.requests.OffsetFetchResponse; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.test.TestUtils; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - - -public class CoordinatorTest { - - private String topicName = "test"; - private String groupId = "test-group"; - private TopicPartition tp = new TopicPartition(topicName, 0); - private int sessionTimeoutMs = 10; - private int heartbeatIntervalMs = 2; - private long retryBackoffMs = 100; - private long requestTimeoutMs = 5000; - private boolean autoCommitEnabled = false; - private long autoCommitIntervalMs = 5000; - private String rebalanceStrategy = "not-matter"; - private MockTime time; - private MockClient client; - private Cluster cluster = TestUtils.singletonCluster(topicName, 1); - private Node node = cluster.nodes().get(0); - private SubscriptionState subscriptions; - private Metadata metadata; - private Metrics metrics; - private Map<String, String> metricTags = new LinkedHashMap<String, String>(); - private ConsumerNetworkClient consumerClient; - private MockRebalanceListener subscriptionListener; - private MockCommitCallback defaultOffsetCommitCallback; - private Coordinator coordinator; - - @Before - public void setup() { - this.time = new MockTime(); - this.client = new MockClient(time); - this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); - this.metadata = new Metadata(0, Long.MAX_VALUE); - this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100); - this.metrics = new Metrics(time); - this.subscriptionListener = new MockRebalanceListener(); - this.defaultOffsetCommitCallback = new MockCommitCallback(); - - client.setNode(node); - - this.coordinator = new Coordinator(consumerClient, - groupId, - sessionTimeoutMs, - heartbeatIntervalMs, - rebalanceStrategy, - subscriptions, - metrics, - "consumer" + groupId, - metricTags, - time, - requestTimeoutMs, - retryBackoffMs, - defaultOffsetCommitCallback, - autoCommitEnabled, - autoCommitIntervalMs); - } - - @After - public void teardown() { - this.metrics.close(); - } - - @Test - public void testNormalHeartbeat() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // normal heartbeat - time.sleep(sessionTimeoutMs); - RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat - assertEquals(1, consumerClient.pendingRequestCount()); - assertFalse(future.isDone()); - - client.prepareResponse(heartbeatResponse(Errors.NONE.code())); - consumerClient.poll(0); - - assertTrue(future.isDone()); - assertTrue(future.succeeded()); - } - - @Test - public void testCoordinatorNotAvailable() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // consumer_coordinator_not_available will mark coordinator as unknown - time.sleep(sessionTimeoutMs); - RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat - assertEquals(1, consumerClient.pendingRequestCount()); - assertFalse(future.isDone()); - - client.prepareResponse(heartbeatResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())); - time.sleep(sessionTimeoutMs); - consumerClient.poll(0); - - assertTrue(future.isDone()); - assertTrue(future.failed()); - assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), future.exception()); - assertTrue(coordinator.coordinatorUnknown()); - } - - @Test - public void testNotCoordinator() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // not_coordinator will mark coordinator as unknown - time.sleep(sessionTimeoutMs); - RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat - assertEquals(1, consumerClient.pendingRequestCount()); - assertFalse(future.isDone()); - - client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_CONSUMER.code())); - time.sleep(sessionTimeoutMs); - consumerClient.poll(0); - - assertTrue(future.isDone()); - assertTrue(future.failed()); - assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.exception(), future.exception()); - assertTrue(coordinator.coordinatorUnknown()); - } - - @Test - public void testIllegalGeneration() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // illegal_generation will cause re-partition - subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener); - subscriptions.changePartitionAssignment(Collections.singletonList(tp)); - - time.sleep(sessionTimeoutMs); - RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat - assertEquals(1, consumerClient.pendingRequestCount()); - assertFalse(future.isDone()); - - client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION.code())); - time.sleep(sessionTimeoutMs); - consumerClient.poll(0); - - assertTrue(future.isDone()); - assertTrue(future.failed()); - assertEquals(Errors.ILLEGAL_GENERATION.exception(), future.exception()); - assertTrue(subscriptions.partitionAssignmentNeeded()); - } - - @Test - public void testUnknownConsumerId() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // illegal_generation will cause re-partition - subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener); - subscriptions.changePartitionAssignment(Collections.singletonList(tp)); - - time.sleep(sessionTimeoutMs); - RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat - assertEquals(1, consumerClient.pendingRequestCount()); - assertFalse(future.isDone()); - - client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_CONSUMER_ID.code())); - time.sleep(sessionTimeoutMs); - consumerClient.poll(0); - - assertTrue(future.isDone()); - assertTrue(future.failed()); - assertEquals(Errors.UNKNOWN_CONSUMER_ID.exception(), future.exception()); - assertTrue(subscriptions.partitionAssignmentNeeded()); - } - - @Test - public void testCoordinatorDisconnect() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // coordinator disconnect will mark coordinator as unknown - time.sleep(sessionTimeoutMs); - RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat - assertEquals(1, consumerClient.pendingRequestCount()); - assertFalse(future.isDone()); - - client.prepareResponse(heartbeatResponse(Errors.NONE.code()), true); // return disconnected - time.sleep(sessionTimeoutMs); - consumerClient.poll(0); - - assertTrue(future.isDone()); - assertTrue(future.failed()); - assertTrue(future.exception() instanceof DisconnectException); - assertTrue(coordinator.coordinatorUnknown()); - } - - @Test - public void testNormalJoinGroup() { - subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener); - subscriptions.needReassignment(); - - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // normal join group - client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code())); - coordinator.ensurePartitionAssignment(); - - assertFalse(subscriptions.partitionAssignmentNeeded()); - assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions()); - assertEquals(1, subscriptionListener.revokedCount); - assertEquals(Collections.emptySet(), subscriptionListener.revoked); - assertEquals(1, subscriptionListener.assignedCount); - assertEquals(Collections.singleton(tp), subscriptionListener.assigned); - } - - @Test - public void testReJoinGroup() { - subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener); - subscriptions.needReassignment(); - - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // disconnected from original coordinator will cause re-discover and join again - client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()), true); - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code())); - coordinator.ensurePartitionAssignment(); - assertFalse(subscriptions.partitionAssignmentNeeded()); - assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions()); - assertEquals(1, subscriptionListener.revokedCount); - assertEquals(Collections.emptySet(), subscriptionListener.revoked); - assertEquals(1, subscriptionListener.assignedCount); - assertEquals(Collections.singleton(tp), subscriptionListener.assigned); - } - - @Test(expected = ApiException.class) - public void testUnknownPartitionAssignmentStrategy() { - subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener); - subscriptions.needReassignment(); - - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // coordinator doesn't like our assignment strategy - client.prepareResponse(joinGroupResponse(0, "consumer", Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code())); - coordinator.ensurePartitionAssignment(); - } - - @Test(expected = ApiException.class) - public void testInvalidSessionTimeout() { - subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener); - subscriptions.needReassignment(); - - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // coordinator doesn't like our assignment strategy - client.prepareResponse(joinGroupResponse(0, "consumer", Collections.<TopicPartition>emptyList(), Errors.INVALID_SESSION_TIMEOUT.code())); - coordinator.ensurePartitionAssignment(); - } - - @Test - public void testCommitOffsetOnly() { - subscriptions.assign(Arrays.asList(tp)); - - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - - AtomicBoolean success = new AtomicBoolean(false); - coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success)); - consumerClient.poll(0); - assertTrue(success.get()); - - assertEquals(100L, subscriptions.committed(tp).offset()); - } - - @Test - public void testCommitOffsetMetadata() { - subscriptions.assign(Arrays.asList(tp)); - - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - - AtomicBoolean success = new AtomicBoolean(false); - coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "hello")), callback(success)); - consumerClient.poll(0); - assertTrue(success.get()); - - assertEquals(100L, subscriptions.committed(tp).offset()); - assertEquals("hello", subscriptions.committed(tp).metadata()); - } - - @Test - public void testCommitOffsetAsyncWithDefaultCallback() { - int invokedBeforeTest = defaultOffsetCommitCallback.invoked; - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null); - consumerClient.poll(0); - assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked); - assertNull(defaultOffsetCommitCallback.exception); - } - - @Test - public void testResetGeneration() { - // enable auto-assignment - subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener); - - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code())); - coordinator.ensurePartitionAssignment(); - - // now switch to manual assignment - subscriptions.unsubscribe(); - coordinator.resetGeneration(); - subscriptions.assign(Arrays.asList(tp)); - - // the client should not reuse generation/consumerId from auto-subscribed generation - client.prepareResponse(new MockClient.RequestMatcher() { - @Override - public boolean matches(ClientRequest request) { - OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body()); - return commitRequest.consumerId().equals(OffsetCommitRequest.DEFAULT_CONSUMER_ID) && - commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID; - } - }, offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - - AtomicBoolean success = new AtomicBoolean(false); - coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success)); - consumerClient.poll(0); - assertTrue(success.get()); - } - - @Test - public void testCommitOffsetAsyncFailedWithDefaultCallback() { - int invokedBeforeTest = defaultOffsetCommitCallback.invoked; - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()))); - coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null); - consumerClient.poll(0); - assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked); - assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), defaultOffsetCommitCallback.exception); - } - - @Test - public void testCommitOffsetAsyncCoordinatorNotAvailable() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // async commit with coordinator not available - MockCommitCallback cb = new MockCommitCallback(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()))); - coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb); - consumerClient.poll(0); - - assertTrue(coordinator.coordinatorUnknown()); - assertEquals(1, cb.invoked); - assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), cb.exception); - } - - @Test - public void testCommitOffsetAsyncNotCoordinator() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // async commit with not coordinator - MockCommitCallback cb = new MockCommitCallback(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code()))); - coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb); - consumerClient.poll(0); - - assertTrue(coordinator.coordinatorUnknown()); - assertEquals(1, cb.invoked); - assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.exception(), cb.exception); - } - - @Test - public void testCommitOffsetAsyncDisconnected() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // async commit with coordinator disconnected - MockCommitCallback cb = new MockCommitCallback(); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true); - coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb); - consumerClient.poll(0); - - assertTrue(coordinator.coordinatorUnknown()); - assertEquals(1, cb.invoked); - assertTrue(cb.exception instanceof DisconnectException); - } - - @Test - public void testCommitOffsetSyncNotCoordinator() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request) - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code()))); - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L))); - } - - @Test - public void testCommitOffsetSyncCoordinatorNotAvailable() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request) - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()))); - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L))); - } - - @Test - public void testCommitOffsetSyncCoordinatorDisconnected() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request) - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true); - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code()))); - coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L))); - } - - @Test(expected = OffsetMetadataTooLarge.class) - public void testCommitOffsetMetadataTooLarge() { - // since offset metadata is provided by the user, we have to propagate the exception so they can handle it - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.OFFSET_METADATA_TOO_LARGE.code()))); - coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata"))); - } - - @Test(expected = ApiException.class) - public void testCommitOffsetSyncCallbackWithNonRetriableException() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - // sync commit with invalid partitions should throw if we have no callback - client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code())), false); - coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L))); - } - - @Test - public void testRefreshOffset() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - subscriptions.assign(Arrays.asList(tp)); - subscriptions.needRefreshCommits(); - client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L)); - coordinator.refreshCommittedOffsetsIfNeeded(); - assertFalse(subscriptions.refreshCommitsNeeded()); - assertEquals(100L, subscriptions.committed(tp).offset()); - } - - @Test - public void testRefreshOffsetLoadInProgress() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - subscriptions.assign(Arrays.asList(tp)); - subscriptions.needRefreshCommits(); - client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L)); - client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L)); - coordinator.refreshCommittedOffsetsIfNeeded(); - assertFalse(subscriptions.refreshCommitsNeeded()); - assertEquals(100L, subscriptions.committed(tp).offset()); - } - - @Test - public void testRefreshOffsetNotCoordinatorForConsumer() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - subscriptions.assign(Arrays.asList(tp)); - subscriptions.needRefreshCommits(); - client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code(), "", 100L)); - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L)); - coordinator.refreshCommittedOffsetsIfNeeded(); - assertFalse(subscriptions.refreshCommitsNeeded()); - assertEquals(100L, subscriptions.committed(tp).offset()); - } - - @Test - public void testRefreshOffsetWithNoFetchableOffsets() { - client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorKnown(); - - subscriptions.assign(Arrays.asList(tp)); - subscriptions.needRefreshCommits(); - client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L)); - coordinator.refreshCommittedOffsetsIfNeeded(); - assertFalse(subscriptions.refreshCommitsNeeded()); - assertEquals(null, subscriptions.committed(tp)); - } - - private Struct consumerMetadataResponse(Node node, short error) { - ConsumerMetadataResponse response = new ConsumerMetadataResponse(error, node); - return response.toStruct(); - } - - private Struct heartbeatResponse(short error) { - HeartbeatResponse response = new HeartbeatResponse(error); - return response.toStruct(); - } - - private Struct joinGroupResponse(int generationId, String consumerId, List<TopicPartition> assignedPartitions, short error) { - JoinGroupResponse response = new JoinGroupResponse(error, generationId, consumerId, assignedPartitions); - return response.toStruct(); - } - - private Struct offsetCommitResponse(Map<TopicPartition, Short> responseData) { - OffsetCommitResponse response = new OffsetCommitResponse(responseData); - return response.toStruct(); - } - - private Struct offsetFetchResponse(TopicPartition tp, Short error, String metadata, long offset) { - OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, error); - OffsetFetchResponse response = new OffsetFetchResponse(Collections.singletonMap(tp, data)); - return response.toStruct(); - } - - private OffsetCommitCallback callback(final AtomicBoolean success) { - return new OffsetCommitCallback() { - @Override - public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { - if (exception == null) - success.set(true); - } - }; - } - - private static class MockCommitCallback implements OffsetCommitCallback { - public int invoked = 0; - public Exception exception = null; - - @Override - public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { - invoked++; - this.exception = exception; - } - } - - private static class MockRebalanceListener implements ConsumerRebalanceListener { - public Collection<TopicPartition> revoked; - public Collection<TopicPartition> assigned; - public int revokedCount = 0; - public int assignedCount = 0; - - - @Override - public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - this.assigned = partitions; - assignedCount++; - } - - @Override - public void onPartitionsRevoked(Collection<TopicPartition> partitions) { - this.revoked = partitions; - revokedCount++; - } - - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 4929449..8773f8c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -350,11 +350,11 @@ public class FetcherTest { @Test public void testGetAllTopics() throws InterruptedException { - // sending response before request, as getAllTopics is a blocking call + // sending response before request, as getTopicMetadata is a blocking call client.prepareResponse( new MetadataResponse(cluster, Collections.<String, Errors>emptyMap()).toStruct()); - Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopics(5000L); + Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(5000L); assertEquals(cluster.topics().size(), allTopics.size()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java new file mode 100644 index 0000000..40ae661 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.TopicPartition; + +import java.util.List; +import java.util.Map; + +public class MockPartitionAssignor extends AbstractPartitionAssignor { + + private Map<String, List<TopicPartition>> result = null; + + @Override + public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, + Map<String, List<String>> subscriptions) { + if (result == null) + throw new IllegalStateException("Call to assign with no result prepared"); + return result; + } + + @Override + public String name() { + return "consumer-mock-assignor"; + } + + public void clear() { + this.result = null; + } + + public void prepare(Map<String, List<TopicPartition>> result) { + this.result = result; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index cabf591..fb21802 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -29,9 +29,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.HashSet; import java.util.Set; import static org.junit.Assert.assertEquals; @@ -151,11 +151,11 @@ public class RequestResponseTest { } private AbstractRequest createConsumerMetadataRequest() { - return new ConsumerMetadataRequest("test-group"); + return new GroupMetadataRequest("test-group"); } private AbstractRequestResponse createConsumerMetadataResponse() { - return new ConsumerMetadataResponse((short) 1, new Node(10, "host1", 2014)); + return new GroupMetadataResponse(Errors.NONE.code(), new Node(10, "host1", 2014)); } private AbstractRequest createFetchRequest() { @@ -180,11 +180,17 @@ public class RequestResponseTest { } private AbstractRequest createJoinGroupRequest() { - return new JoinGroupRequest("group1", 30000, Arrays.asList("topic1"), "consumer1", "strategy1"); + ByteBuffer metadata = ByteBuffer.wrap(new byte[] {}); + List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>(); + protocols.add(new JoinGroupRequest.GroupProtocol("consumer-range", metadata)); + return new JoinGroupRequest("group1", 30000, "consumer1", "consumer", protocols); } private AbstractRequestResponse createJoinGroupResponse() { - return new JoinGroupResponse(Errors.NONE.code(), 1, "consumer1", Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1))); + Map<String, ByteBuffer> members = new HashMap<>(); + members.put("consumer1", ByteBuffer.wrap(new byte[]{})); + members.put("consumer2", ByteBuffer.wrap(new byte[]{})); + return new JoinGroupResponse(Errors.NONE.code(), 1, "range", "consumer1", "leader", members); } private AbstractRequest createLeaveGroupRequest() { http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java index 1ff5e73..eb62c9e 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java @@ -340,7 +340,7 @@ public class KafkaBasedLogTest { consumer.schedulePollTask(new Runnable() { @Override public void run() { - consumer.setException(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception()); + consumer.setException(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception()); } }); http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/admin/AclCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 6a8a8a2..fd6d420 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -31,7 +31,7 @@ object AclCommand { val Newline = scala.util.Properties.lineSeparator val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] ( Topic -> Set(Read, Write, Describe), - ConsumerGroup -> Set(Read), + Group -> Set(Read), Cluster -> Set(Create, ClusterAction) ) @@ -146,14 +146,14 @@ object AclCommand { val resources = getResource(opts) val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic) - val consumerGroups: Set[Resource] = resources.filter(_.resourceType == ConsumerGroup) + val groups: Set[Resource] = resources.filter(_.resourceType == Group) //Read,Describe on topic, Read on consumerGroup + Create on cluster val acls = getAcl(opts, Set(Read, Describe)) topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++ - consumerGroups.map(_ -> getAcl(opts, Set(Read))).toMap[Resource, Set[Acl]] + groups.map(_ -> getAcl(opts, Set(Read))).toMap[Resource, Set[Acl]] } private def getCliResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { @@ -221,10 +221,10 @@ object AclCommand { resources += Resource.ClusterResource if (opts.options.has(opts.groupOpt)) - opts.options.valuesOf(opts.groupOpt).asScala.foreach(consumerGroup => resources += new Resource(ConsumerGroup, consumerGroup.trim)) + opts.options.valuesOf(opts.groupOpt).asScala.foreach(group => resources += new Resource(Group, group.trim)) if (resources.isEmpty && dieIfNoResourceFound) - CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --consumer-group <group>") + CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group>") resources } @@ -266,16 +266,16 @@ object AclCommand { .withValuesSeparatedBy(Delimiter) val clusterOpt = parser.accepts("cluster", "Add/Remove cluster acls.") - val groupOpt = parser.accepts("consumer-group", "Comma separated list of consumer groups to which the acls should be added or removed. " + - "A value of * indicates the acls should apply to all consumer-groups.") + val groupOpt = parser.accepts("group", "Comma separated list of groups to which the acls should be added or removed. " + + "A value of * indicates the acls should apply to all groups.") .withRequiredArg - .describedAs("consumer-group") + .describedAs("group") .ofType(classOf[String]) .withValuesSeparatedBy(Delimiter) val addOpt = parser.accepts("add", "Indicates you are trying to add acls.") val removeOpt = parser.accepts("remove", "Indicates you are trying to remove acls.") - val listOpt = parser.accepts("list", "List acls for the specified resource, use --topic <topic> or --consumer-group <group> or --cluster to specify a resource.") + val listOpt = parser.accepts("list", "List acls for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.") val operationsOpt = parser.accepts("operations", "Comma separated list of operations, default is All. Valid operation names are: " + Newline + Operation.values.map("\t" + _).mkString(Newline) + Newline) @@ -320,7 +320,7 @@ object AclCommand { "This will generate acls that allows WRITE,DESCRIBE on topic and CREATE on cluster. ") val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove acls for consumer role. " + - "This will generate acls that allows READ,DESCRIBE on topic and READ on consumer-group.") + "This will generate acls that allows READ,DESCRIBE on topic and READ on group.") val helpOpt = parser.accepts("help", "Print usage information.") @@ -343,7 +343,7 @@ object AclCommand { CommandLineUtils.printUsageAndDie(parser, "With --producer you must specify a --topic") if (options.has(consumerOpt) && (!options.has(topicOpt) || !options.has(groupOpt) || (!options.has(producerOpt) && options.has(clusterOpt)))) - CommandLineUtils.printUsageAndDie(parser, "With --consumer you must specify a --topic and a --consumer-group and no --cluster option should be specified.") + CommandLineUtils.printUsageAndDie(parser, "With --consumer you must specify a --topic and a --group and no --cluster option should be specified.") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index e6ca112..ed54aee 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -23,16 +23,15 @@ import kafka.common.{Topic, AdminCommandFailedException} import kafka.utils.CommandLineUtils import kafka.utils._ import kafka.utils.ZkUtils._ -import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException import scala.collection._ import scala.collection.JavaConversions._ import kafka.log.{Defaults, LogConfig} import kafka.consumer.{ConsumerConfig, Whitelist} -import kafka.server.{ConfigType, OffsetManager} +import kafka.server.ConfigType import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.security.JaasUtils -import kafka.coordinator.ConsumerCoordinator +import kafka.coordinator.GroupCoordinator object TopicCommand extends Logging { @@ -130,7 +129,7 @@ object TopicCommand extends Logging { } if(opts.options.has(opts.partitionsOpt)) { - if (topic == ConsumerCoordinator.OffsetsTopicName) { + if (topic == GroupCoordinator.OffsetsTopicName) { throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.") } println("WARNING: If partitions are increased for a topic that has a key, the partition " + http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala deleted file mode 100644 index 258d5fe..0000000 --- a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -import java.nio.ByteBuffer - -import kafka.common.ErrorMapping -import kafka.network.{RequestOrResponseSend, RequestChannel} -import kafka.network.RequestChannel.Response - -object ConsumerMetadataRequest { - val CurrentVersion = 0.shortValue - val DefaultClientId = "" - - def readFrom(buffer: ByteBuffer) = { - // envelope - val versionId = buffer.getShort - val correlationId = buffer.getInt - val clientId = ApiUtils.readShortString(buffer) - - // request - val group = ApiUtils.readShortString(buffer) - ConsumerMetadataRequest(group, versionId, correlationId, clientId) - } - -} - -case class ConsumerMetadataRequest(group: String, - versionId: Short = ConsumerMetadataRequest.CurrentVersion, - correlationId: Int = 0, - clientId: String = ConsumerMetadataRequest.DefaultClientId) - extends RequestOrResponse(Some(RequestKeys.ConsumerMetadataKey)) { - - def sizeInBytes = - 2 + /* versionId */ - 4 + /* correlationId */ - ApiUtils.shortStringLength(clientId) + - ApiUtils.shortStringLength(group) - - def writeTo(buffer: ByteBuffer) { - // envelope - buffer.putShort(versionId) - buffer.putInt(correlationId) - ApiUtils.writeShortString(buffer, clientId) - - // consumer metadata request - ApiUtils.writeShortString(buffer, group) - } - - override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { - // return ConsumerCoordinatorNotAvailable for all uncaught errors - val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId) - requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) - } - - def describe(details: Boolean) = { - val consumerMetadataRequest = new StringBuilder - consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName) - consumerMetadataRequest.append("; Version: " + versionId) - consumerMetadataRequest.append("; CorrelationId: " + correlationId) - consumerMetadataRequest.append("; ClientId: " + clientId) - consumerMetadataRequest.append("; Group: " + group) - consumerMetadataRequest.toString() - } -} \ No newline at end of file
