junrao commented on code in PR #13046: URL: https://github.com/apache/kafka/pull/13046#discussion_r1063005106
########## core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala: ########## @@ -243,282 +245,287 @@ class LeaderEpochFileCacheTest { //Given val cache = new LeaderEpochFileCache(tp, checkpoint) - cache.assign(epoch = 2, startOffset = 6) + cache.assign(2, 6) //When val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath)) val cache2 = new LeaderEpochFileCache(tp, checkpoint2) //Then assertEquals(1, cache2.epochEntries.size) - assertEquals(EpochEntry(2, 6), cache2.epochEntries.toList(0)) + assertEquals(new EpochEntry(2, 6), cache2.epochEntries.get(0)) } @Test def shouldEnforceMonotonicallyIncreasingEpochs(): Unit = { //Given - cache.assign(epoch = 1, startOffset = 5); + cache.assign(1, 5); var logEndOffset = 6 - cache.assign(epoch = 2, startOffset = 6); + cache.assign(2, 6); logEndOffset = 7 //When we update an epoch in the past with a different offset, the log has already reached //an inconsistent state. Our options are either to raise an error, ignore the new append, //or truncate the cached epochs to the point of conflict. We take this latter approach in //order to guarantee that epochs and offsets in the cache increase monotonically, which makes //the search logic simpler to reason about. - cache.assign(epoch = 1, startOffset = 7); + cache.assign(1, 7); logEndOffset = 8 //Then later epochs will be removed - assertEquals(Some(1), cache.latestEpoch) + assertEquals(Optional.of(1), cache.latestEpoch) //Then end offset for epoch 1 will have changed - assertEquals((1, 8), cache.endOffsetFor(1, logEndOffset)) + assertEquals((1, 8), toTuple(cache.endOffsetFor(1, logEndOffset))) //Then end offset for epoch 2 is now undefined - assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(2, logEndOffset)) - assertEquals(EpochEntry(1, 7), cache.epochEntries(0)) + assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), toTuple(cache.endOffsetFor(2, logEndOffset))) + assertEquals(new EpochEntry(1, 7), cache.epochEntries.get(0)) + } + + def toTuple[K, V](entry: java.util.Map.Entry[K, V]): (K, V) = { Review Comment: Could this be private? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1294,36 +1293,53 @@ class UnifiedLog(@volatile var logStartOffset: Long, // The first cached epoch usually corresponds to the log start offset, but we have to verify this since // it may not be true following a message format version bump as the epoch will not be available for // log entries written in the older format. - val earliestEpochEntry = leaderEpochCache.flatMap(_.earliestEntry) - val epochOpt = earliestEpochEntry match { - case Some(entry) if entry.startOffset <= logStartOffset => Optional.of[Integer](entry.epoch) - case _ => Optional.empty[Integer]() + val earliestEpochEntry = leaderEpochCache match { + case Some(cache) => cache.earliestEntry() + case None => Optional.empty[EpochEntry]() } + + val epochOpt = if (earliestEpochEntry.isPresent && earliestEpochEntry.get().startOffset <= logStartOffset) { + Optional.of[Integer](earliestEpochEntry.get().epoch) + } else Optional.empty[Integer]() + Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt)) } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { val curLocalLogStartOffset = localLogStartOffset() - val earliestLocalLogEpochEntry = leaderEpochCache.flatMap(cache => - cache.epochForOffset(curLocalLogStartOffset).flatMap(cache.epochEntry)) - val epochOpt = earliestLocalLogEpochEntry match { - case Some(entry) if entry.startOffset <= curLocalLogStartOffset => Optional.of[Integer](entry.epoch) - case _ => Optional.empty[Integer]() + + val earliestLocalLogEpochEntry: Optional[EpochEntry] = leaderEpochCache match { + case Some(cache) => + val value = cache.epochForOffset(curLocalLogStartOffset) + if (value.isPresent) cache.epochEntry(value.get) else Optional.empty[EpochEntry]() + case None => Optional.empty[EpochEntry]() } + + val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset) + Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch) + else Optional.empty[Integer]() + Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochOpt)) } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) { - val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer]) - val epochOptional = Optional.ofNullable(latestEpochOpt.orNull) - Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional)) + Review Comment: extra newline ########## storage/src/main/java/org/apache/kafka/server/log/internals/LeaderEpochCheckpoint.java: ########## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.List; + +public interface LeaderEpochCheckpoint { Review Comment: Yes, the proposed package names sound good to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org