[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move LeaderEpochFileCache and its dependencies to the storage module.

2023-01-06 Thread GitBox


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1063709428


##
storage/src/main/java/org/apache/kafka/server/log/internals/LeaderEpochFileCache.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
+
+/**
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular 
replica.
+ * 
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ */
+public class LeaderEpochFileCache {
+private final LeaderEpochCheckpoint checkpoint;
+private final Logger log;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final TreeMap epochs = new TreeMap<>();
+
+/**
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ */
+public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+this.checkpoint = checkpoint;
+LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
+log = logContext.logger(LeaderEpochFileCache.class);
+checkpoint.read().forEach(this::assign);
+}
+
+/**
+ * Assigns the supplied Leader Epoch to the supplied Offset
+ * Once the epoch is assigned it cannot be reassigned
+ */
+public void assign(int epoch, long startOffset) {
+EpochEntry entry = new EpochEntry(epoch, startOffset);
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+flush();
+}
+}
+
+public void assign(List entries) {
+entries.forEach(entry -> {
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+}
+});
+flush();
+}
+
+private boolean isUpdateNeeded(EpochEntry entry) {
+Optional lastEntry = latestEntry();
+return lastEntry.map(epochEntry -> entry.epoch != epochEntry.epoch || 
entry.startOffset < epochEntry.startOffset).orElse(true);
+}
+
+private boolean assign(EpochEntry entry) {
+if (entry.epoch < 0 || entry.startOffset < 0) {
+throw new IllegalArgumentException("Received invalid partition 
leader epoch entry " + entry);
+}
+
+// Check whether the append is needed before acquiring the write lock
+// in order to avoid contention with readers in the common case
+if (!isUpdateNeeded(entry)) return false;
+
+lock.writeLock().lock();
+try {
+if (isUpdateNeeded(entry)) {
+maybeTruncateNonMonotonicEntries(entry);
+epochs.put(entry.epoch, entry);
+return true;
+} else {
+return false;
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+/**
+ * Remove any entries which violate monotonicity prior to appending a new 
entry
+ */
+public void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) {
+List removedEpochs = removeFromEnd(entry -> entry.epoch >= 
newEntry.epoch || entry.startOffset >= newEntry.startOffset);
+
+
+if (removedEpochs.size() > 1 || (!removedEpochs.isEmpty() && 
removedEpochs.get(0).startOffset != newEntry.startOffset)) {
+
+

[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move LeaderEpochFileCache and its dependencies to the storage module.

2023-01-06 Thread GitBox


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1063708632


##
core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala:
##
@@ -65,9 +65,14 @@ class OffsetCheckpointFile(val file: File, 
logDirFailureChannel: LogDirFailureCh
   val checkpoint = new CheckpointFileWithFailureHandler[(TopicPartition, 
Long)](file, OffsetCheckpointFile.CurrentVersion,
 OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
 
-  def write(offsets: Map[TopicPartition, Long]): Unit = 
checkpoint.write(offsets)
+  def write(offsets: Map[TopicPartition, Long]): Unit = 
checkpoint.write(offsets.map{case (k, v)=> Tuple2(k, v)}.toSeq.asJava)

Review Comment:
   The `map` operation is a `no-op` here since we are converting a tuple to a 
tuple,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move LeaderEpochFileCache and its dependencies to the storage module.

2023-01-06 Thread GitBox


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1063705498


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -995,11 +994,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 }
   }
 
-  def latestEpoch: Option[Int] = leaderEpochCache.flatMap(_.latestEpoch)
+  def latestEpoch: Option[Int] = 
leaderEpochCache.flatMap(_.latestEpoch.asScala).map(Int.unbox(_))

Review Comment:
   Yes, if all usages are in Scala, it makes sense to keep it for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move LeaderEpochFileCache and its dependencies to the storage module.

2023-01-04 Thread GitBox


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1062119225


##
storage/src/main/java/org/apache/kafka/server/log/internals/LeaderEpochCheckpoint.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.List;
+
+public interface LeaderEpochCheckpoint {

Review Comment:
   I originally also preferred having `server` in the package name, but on 
further thought, there are advantages in basically having the module name as 
the package name after `kafka`. It avoids accidental split packages across 
modules, which can cause problems if we ever decide to use Java 9 modules. And 
it means that a given server module could eventually be used by clients too if 
it made sense (without forcing package renames or weird package names).
   
   If we agree on that approach, then the module names would be 
`org.apache.kafka.storage.internals.checkpoint` and 
`org.apache.kafka.storage.internals.log` (one annoying aspect of using `log` is 
that GitHub filters it from their code navigation, but we probably have to live 
with that for now).
   
   @junrao what are your thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move LeaderEpochFileCache and its dependencies to the storage module.

2023-01-04 Thread GitBox


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1062119225


##
storage/src/main/java/org/apache/kafka/server/log/internals/LeaderEpochCheckpoint.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.List;
+
+public interface LeaderEpochCheckpoint {

Review Comment:
   I originally also preferred having `server` in the package name, but on 
further thought, there are advantages in basically having the module name as 
the package name after `kafka`. It avoids accidental split packages across 
modules, which can cause problems if we ever decide to use Java 9 modules.
   
   If we agree on that approach, then the module names would be 
`org.apache.kafka.storage.internals.checkpoint` and 
`org.apache.kafka.storage.internals.log` (one annoying aspect of using `log` is 
that GitHub filters it from their code navigation, but we probably have to live 
with that for now).
   
   @junrao what are your thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move LeaderEpochFileCache and its dependencies to the storage module.

2022-12-23 Thread GitBox


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1056752254


##
storage/src/main/java/org/apache/kafka/server/log/internals/LeaderEpochCheckpoint.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.List;
+
+public interface LeaderEpochCheckpoint {

Review Comment:
   Thinking about this some more, to remain consistent with other packages like 
`raft`, `metadata` and others, we could skip the `server` and just have 
`org.apache.kafka.storage.internals.*` for the non api classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move LeaderEpochFileCache and its dependencies to the storage module.

2022-12-23 Thread GitBox


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1056750161


##
storage/src/main/java/org/apache/kafka/server/log/internals/LeaderEpochCheckpoint.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.List;
+
+public interface LeaderEpochCheckpoint {

Review Comment:
   It makes sense to me that it should be. But perhaps the package structure 
for this module should be `org.apache.kafka.server.storage.internals.*` versus 
the one we've been using for the log layer. Maybe we can use this approach for 
the checkpoint code and move the log layer all at once at the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move LeaderEpochFileCache and its dependencies to the storage module.

2022-12-23 Thread GitBox


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1056748447


##
storage/src/main/java/org/apache/kafka/server/log/internals/LeaderEpochCheckpoint.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.List;
+
+public interface LeaderEpochCheckpoint {

Review Comment:
   That said, should this be in the `storage` module at all?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move LeaderEpochFileCache and its dependencies to the storage module.

2022-12-23 Thread GitBox


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1056748412


##
storage/src/main/java/org/apache/kafka/server/log/internals/LeaderEpochCheckpoint.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.List;
+
+public interface LeaderEpochCheckpoint {

Review Comment:
   Looks like this was in the checkpoints package previously. Would it make 
sense to keep that distinction here? It could be 
`org.apache.kafka.server.checkpoint.internals` or something along those lines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move LeaderEpochFileCache and its dependencies to the storage module.

2022-12-23 Thread GitBox


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1056748020


##
storage/src/main/java/org/apache/kafka/server/log/internals/EpochEntry.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Objects;
+
+// Mapping of epoch to the first offset of the subsequent epoch
+public class EpochEntry {
+public final int epoch;
+public final long startOffset;
+
+public EpochEntry(int epoch, long startOffset) {
+this.epoch = epoch;
+this.startOffset = startOffset;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+EpochEntry that = (EpochEntry) o;
+return epoch == that.epoch && startOffset == that.startOffset;
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(epoch, startOffset);

Review Comment:
   This results in boxing of the two elements and the allocation of an array. 
To avoid unexpected performance regressions, can we please stick with an 
allocation free implementation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move LeaderEpochFileCache and its dependencies to the storage module.

2022-12-23 Thread GitBox


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1056745406


##
core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala:
##
@@ -62,12 +62,17 @@ trait OffsetCheckpoint {
  *  -checkpoint file end--
  */
 class OffsetCheckpointFile(val file: File, logDirFailureChannel: 
LogDirFailureChannel = null) {
-  val checkpoint = new CheckpointFileWithFailureHandler[(TopicPartition, 
Long)](file, OffsetCheckpointFile.CurrentVersion,
+  val checkpoint = new CheckpointFileWithFailureHandler[Tuple2[TopicPartition, 
Long]](file, OffsetCheckpointFile.CurrentVersion,

Review Comment:
   What is the reason for this change?



##
storage/src/main/java/org/apache/kafka/server/log/internals/EpochEntry.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Objects;
+
+// Mapping of epoch to the first offset of the subsequent epoch
+public class EpochEntry {
+public final int epoch;
+public final long startOffset;
+
+public EpochEntry(int epoch, long startOffset) {
+this.epoch = epoch;
+this.startOffset = startOffset;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+EpochEntry that = (EpochEntry) o;
+return epoch == that.epoch && startOffset == that.startOffset;
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(epoch, startOffset);

Review Comment:
   This results in boxing of the two elements and the allocation of an array. 
To avoid unexpected regressions, can we please stick with an allocation free 
implementation?



##
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##
@@ -255,7 +256,7 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
 }
 
 // Get the respective epoch in which the starting-offset exists.
-var maybeEpoch = leaderEpochCache.epochForOffset(startingOffset)
+var maybeEpoch = leaderEpochCache.epochForOffset(startingOffset).asScala

Review Comment:
   Similarly here, can we just use `Optional` instead of converting to `Option`?



##
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##
@@ -2650,7 +2647,7 @@ class PartitionTest extends AbstractPartitionTest {
 assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
 
 val leaderLog = partition.localLogOrException
-assertEquals(Some(EpochEntry(leaderEpoch, 0L)), 
leaderLog.leaderEpochCache.flatMap(_.latestEntry))
+assertEquals(Some(new EpochEntry(leaderEpoch, 0L)), 
leaderLog.leaderEpochCache.flatMap(_.latestEntry.asScala))

Review Comment:
   We can change the expected value to be `Optional.of` instead of converting 
the response with `asScala`.



##
storage/src/main/java/org/apache/kafka/server/log/internals/EpochEntry.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Objects;
+
+// Mapping of epoch to the first offset of the subsequent epoch
+public class EpochEntry {
+public final int epoch;
+public final long startOffset;
+
+public EpochEntry(int epoch, long startOffset) {
+this.epoch = epoch;
+this.startOffset = startOffset;
+}
+