Repository: kafka Updated Branches: refs/heads/trunk 17ce2a730 -> 020ca7903
http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index f65884e..5dfcb63 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -94,7 +94,7 @@ class LogOffsetTest extends ZooKeeperTestHarness { val log = logManager.getLog(new TopicPartition(topic, part)).get for (_ <- 0 until 20) - log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes())) + log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0) log.flush() log.maybeIncrementLogStartOffset(3) @@ -128,7 +128,7 @@ class LogOffsetTest extends ZooKeeperTestHarness { val log = logManager.getLog(new TopicPartition(topic, part)).get for (_ <- 0 until 20) - log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes())) + log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0) log.flush() val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime, 15) @@ -189,7 +189,7 @@ class LogOffsetTest extends ZooKeeperTestHarness { val log = logManager.createLog(new TopicPartition(topic, part), logManager.defaultConfig) for (_ <- 0 until 20) - log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes())) + log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0) log.flush() val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs @@ -217,7 +217,7 @@ class LogOffsetTest extends ZooKeeperTestHarness { val logManager = server.getLogManager val log = logManager.createLog(new TopicPartition(topic, part), logManager.defaultConfig) for (_ <- 0 until 20) - log.append(TestUtils.singletonRecords(value = Integer.toString(42).getBytes())) + log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0) log.flush() val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.EarliestTime, 10) http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index d4118c1..b0e81a9 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -116,7 +116,7 @@ class ReplicaFetcherThreadTest { //Stubs expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes() - expect(leaderEpochs.latestUsedEpoch).andReturn(5) + expect(leaderEpochs.latestEpoch).andReturn(5) expect(replicaManager.logManager).andReturn(logManager).anyTimes() stub(replica, replicaManager) @@ -174,7 +174,7 @@ class ReplicaFetcherThreadTest { expect(logManager.truncateTo(capture(truncateToCapture))).once expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes() - expect(leaderEpochs.latestUsedEpoch).andReturn(5).anyTimes() + expect(leaderEpochs.latestEpoch).andReturn(5).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes() stub(replica, replicaManager) @@ -220,7 +220,7 @@ class ReplicaFetcherThreadTest { expect(logManager.truncateTo(capture(truncated))).once expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLeo)).anyTimes() - expect(leaderEpochs.latestUsedEpoch).andReturn(5) + expect(leaderEpochs.latestEpoch).andReturn(5) expect(replicaManager.logManager).andReturn(logManager).anyTimes() stub(replica, replicaManager) replay(leaderEpochs, replicaManager, logManager, quota, replica) @@ -263,7 +263,7 @@ class ReplicaFetcherThreadTest { expect(logManager.truncateTo(capture(truncated))).anyTimes() expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLeo)).anyTimes() - expect(leaderEpochs.latestUsedEpoch).andReturn(5) + expect(leaderEpochs.latestEpoch).andReturn(5) expect(replicaManager.logManager).andReturn(logManager).anyTimes() stub(replica, replicaManager) replay(leaderEpochs, replicaManager, logManager, quota, replica) @@ -312,7 +312,7 @@ class ReplicaFetcherThreadTest { //Stub return values expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes() - expect(leaderEpochs.latestUsedEpoch).andReturn(5) + expect(leaderEpochs.latestEpoch).andReturn(5) expect(replicaManager.logManager).andReturn(logManager).anyTimes() stub(replica, replicaManager) @@ -358,7 +358,7 @@ class ReplicaFetcherThreadTest { expect(logManager.truncateTo(capture(truncateToCapture))).once expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes() - expect(leaderEpochs.latestUsedEpoch).andReturn(5) + expect(leaderEpochs.latestEpoch).andReturn(5) expect(replicaManager.logManager).andReturn(logManager).anyTimes() stub(replica, replicaManager) http://git-wip-us.apache.org/repos/asf/kafka/blob/020ca790/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index 1a24c34..afd1f35 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -48,28 +48,10 @@ class LeaderEpochFileCacheTest { leo = 11 //Then - assertEquals(2, cache.latestUsedEpoch()) + assertEquals(2, cache.latestEpoch()) assertEquals(EpochEntry(2, 10), cache.epochEntries()(0)) assertEquals(11, cache.endOffsetFor(2)) //should match leo } - - @Test - def shouldUpdateEpochWithLogEndOffset() = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) - - //Given - leo = 9 - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - - //When - cache.cacheLatestEpoch(2) - cache.maybeAssignLatestCachedEpochToLeo() - - //Then - assertEquals(2, cache.latestUsedEpoch()) - assertEquals(EpochEntry(2, 9), cache.epochEntries()(0)) - } @Test def shouldReturnLogEndOffsetIfLatestEpochRequested() = { @@ -113,38 +95,28 @@ class LeaderEpochFileCacheTest { leo = 9 val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.cacheLatestEpoch(2) - cache.maybeAssignLatestCachedEpochToLeo() + cache.assign(2, leo) //When called again later - leo = 10 - cache.cacheLatestEpoch(2) - cache.maybeAssignLatestCachedEpochToLeo() + cache.assign(2, 10) //Then the offset should NOT have been updated - assertEquals(9, cache.epochEntries()(0).startOffset) + assertEquals(leo, cache.epochEntries()(0).startOffset) } @Test def shouldAllowLeaderEpochToChangeEvenIfOffsetDoesNot() = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) + def leoFinder() = new LogOffsetMetadata(0) //Given - leo = 9 val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) + cache.assign(2, 9) - cache.cacheLatestEpoch(2) - cache.maybeAssignLatestCachedEpochToLeo() - - //When update epoch with same leo - cache.cacheLatestEpoch(3) - cache.maybeAssignLatestCachedEpochToLeo() + //When update epoch new epoch but same offset + cache.assign(3, 9) - //Then the offset should NOT have been updated - assertEquals(9, cache.endOffsetFor(3)) - assertEquals(9, cache.endOffsetFor(2)) - assertEquals(3, cache.latestUsedEpoch()) + //Then epoch should have been updated + assertEquals(ListBuffer(EpochEntry(2, 9), EpochEntry(3, 9)), cache.epochEntries()) } @Test @@ -168,7 +140,6 @@ class LeaderEpochFileCacheTest { val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) //Then - assertEquals(UNDEFINED_EPOCH, cache.latestUsedEpoch()) assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(0)) } @@ -253,8 +224,7 @@ class LeaderEpochFileCacheTest { val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) //When - cache.cacheLatestEpoch(epoch = 2) - cache.maybeAssignLatestCachedEpochToLeo() + cache.assign(epoch = 2, offset = 100) //Then assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(3)) @@ -312,7 +282,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 1, offset = 7); leo = 8 //Then epoch should not be changed - assertEquals(2, cache.latestUsedEpoch()) + assertEquals(2, cache.latestEpoch()) //Then end offset for epoch 1 shouldn't have changed assertEquals(6, cache.endOffsetFor(1)) @@ -347,18 +317,16 @@ class LeaderEpochFileCacheTest { //Given val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.cacheLatestEpoch(epoch = 0) //leo=0 - cache.maybeAssignLatestCachedEpochToLeo() + cache.assign(epoch = 0, offset = 0) //leo=0 //When - cache.cacheLatestEpoch(epoch = 1) //leo=0 - cache.maybeAssignLatestCachedEpochToLeo() + cache.assign(epoch = 1, offset = 0) //leo=0 //Then epoch should go up - assertEquals(1, cache.latestUsedEpoch()) + assertEquals(1, cache.latestEpoch()) //offset for 1 should still be 0 assertEquals(0, cache.endOffsetFor(1)) - //offset for 0 should the start offset of epoch(1) => 0 + //offset for epoch 0 should still be 0 assertEquals(0, cache.endOffsetFor(0)) //When we write 5 messages as epoch 1 @@ -366,12 +334,12 @@ class LeaderEpochFileCacheTest { //Then end offset for epoch(1) should be leo => 5 assertEquals(5, cache.endOffsetFor(1)) - //Epoch(0) should still show the start offset for Epoch(1) => 0 + //Epoch 0 should still be at offset 0 assertEquals(0, cache.endOffsetFor(0)) //When - cache.cacheLatestEpoch(epoch = 2) //leo=5 - cache.maybeAssignLatestCachedEpochToLeo() + cache.assign(epoch = 2, offset = 5) //leo=5 + leo = 10 //write another 5 messages //Then end offset for epoch(2) should be leo => 10 @@ -398,7 +366,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 0, offset = 2); leo = 3 //Then epoch should stay, offsets should grow - assertEquals(0, cache.latestUsedEpoch()) + assertEquals(0, cache.latestEpoch()) assertEquals(leo, cache.endOffsetFor(0)) //When messages arrive with greater epoch @@ -406,7 +374,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 1, offset = 4); leo = 5 cache.assign(epoch = 1, offset = 5); leo = 6 - assertEquals(1, cache.latestUsedEpoch()) + assertEquals(1, cache.latestEpoch()) assertEquals(leo, cache.endOffsetFor(1)) //When @@ -414,7 +382,7 @@ class LeaderEpochFileCacheTest { cache.assign(epoch = 2, offset = 7); leo = 8 cache.assign(epoch = 2, offset = 8); leo = 9 - assertEquals(2, cache.latestUsedEpoch()) + assertEquals(2, cache.latestEpoch()) assertEquals(leo, cache.endOffsetFor(2)) //Older epochs should return the start offset of the first message in the subsequent epoch. @@ -589,7 +557,7 @@ class LeaderEpochFileCacheTest { cache.clearLatest(offset = 9) //Then should keep the preceding epochs - assertEquals(3, cache.latestUsedEpoch()) + assertEquals(3, cache.latestEpoch()) assertEquals(ListBuffer(EpochEntry(2, 6), EpochEntry(3, 8)), cache.epochEntries) } @@ -653,7 +621,7 @@ class LeaderEpochFileCacheTest { val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) //Then - assertEquals(-1, cache.latestUsedEpoch) + assertEquals(-1, cache.latestEpoch) } @Test @@ -692,28 +660,6 @@ class LeaderEpochFileCacheTest { cache.clearLatest(7) } - @Test - def shouldUpdateEpochCacheOnLeadershipChangeThenCommit(): Unit ={ - //Given - def leoFinder() = new LogOffsetMetadata(5) - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - - //When - cache.cacheLatestEpoch(2) - - //Then - assertEquals(UNDEFINED_EPOCH, cache.latestUsedEpoch()) - - //When - cache.maybeAssignLatestCachedEpochToLeo() - - //Then should have saved epoch - assertEquals(2, cache.latestUsedEpoch()) - - //Then should have applied LEO to epoch - assertEquals(5, cache.endOffsetFor(2)) - } - @Before def setUp() { checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile())
