Repository: kafka Updated Branches: refs/heads/trunk 88fdca26a -> c2d9b95f3
http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala index 629020e..e3dca32 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala @@ -233,12 +233,12 @@ class GroupMetadataManagerTest { assertEquals(Some(Errors.NONE.code), maybeError) assertTrue(group.hasOffsets) - val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition)) + val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition))) val maybePartitionResponse = cachedOffsets.get(topicPartition) assertFalse(maybePartitionResponse.isEmpty) val partitionResponse = maybePartitionResponse.get - assertEquals(Errors.NONE.code, partitionResponse.errorCode) + assertEquals(Errors.NONE, partitionResponse.error) assertEquals(offset, partitionResponse.offset) } @@ -317,7 +317,7 @@ class GroupMetadataManagerTest { assertEquals(Some(expectedError.code), maybeError) assertFalse(group.hasOffsets) - val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition)) + val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition))) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition).map(_.offset)) EasyMock.verify(replicaManager) @@ -372,7 +372,7 @@ class GroupMetadataManagerTest { assertEquals(None, group.offset(topicPartition1)) assertEquals(Some(offset), group.offset(topicPartition2).map(_.offset)) - val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition1, topicPartition2)) + val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2))) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset)) assertEquals(Some(offset), cachedOffsets.get(topicPartition2).map(_.offset)) } @@ -418,7 +418,7 @@ class GroupMetadataManagerTest { // the full group should be gone since all offsets were removed assertEquals(None, groupMetadataManager.getGroup(groupId)) - val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition1, topicPartition2)) + val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2))) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset)) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset)) } @@ -464,7 +464,7 @@ class GroupMetadataManagerTest { // the full group should be gone since all offsets were removed assertEquals(None, groupMetadataManager.getGroup(groupId)) - val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition1, topicPartition2)) + val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2))) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset)) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset)) } @@ -535,7 +535,7 @@ class GroupMetadataManagerTest { // the full group should be gone since all offsets were removed assertEquals(None, groupMetadataManager.getGroup(groupId)) - val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition1, topicPartition2)) + val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2))) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset)) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset)) } @@ -599,7 +599,7 @@ class GroupMetadataManagerTest { assertEquals(None, group.offset(topicPartition1)) assertEquals(None, group.offset(topicPartition2)) - val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition1, topicPartition2)) + val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2))) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset)) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/c2d9b95f/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index aef29bc..6ca784a 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -234,7 +234,9 @@ class OffsetCommitTest extends ZooKeeperTestHarness { assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) // committed offset should exist with fetch version 0 - assertEquals(1L, simpleConsumer.fetchOffsets(OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0)), versionId = 0)).requestInfo.get(topicPartition).get.offset) + val offsetFetchReq = OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0)), versionId = 0) + val offsetFetchResp = simpleConsumer.fetchOffsets(offsetFetchReq) + assertEquals(1L, offsetFetchResp.requestInfo.get(topicPartition).get.offset) // v1 version commit request with commit timestamp set to -1