ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1056745406


##########
core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala:
##########
@@ -62,12 +62,17 @@ trait OffsetCheckpoint {
  *  -----checkpoint file end----------
  */
 class OffsetCheckpointFile(val file: File, logDirFailureChannel: 
LogDirFailureChannel = null) {
-  val checkpoint = new CheckpointFileWithFailureHandler[(TopicPartition, 
Long)](file, OffsetCheckpointFile.CurrentVersion,
+  val checkpoint = new CheckpointFileWithFailureHandler[Tuple2[TopicPartition, 
Long]](file, OffsetCheckpointFile.CurrentVersion,

Review Comment:
   What is the reason for this change?



##########
storage/src/main/java/org/apache/kafka/server/log/internals/EpochEntry.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Objects;
+
+// Mapping of epoch to the first offset of the subsequent epoch
+public class EpochEntry {
+    public final int epoch;
+    public final long startOffset;
+
+    public EpochEntry(int epoch, long startOffset) {
+        this.epoch = epoch;
+        this.startOffset = startOffset;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        EpochEntry that = (EpochEntry) o;
+        return epoch == that.epoch && startOffset == that.startOffset;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(epoch, startOffset);

Review Comment:
   This results in boxing of the two elements and the allocation of an array. 
To avoid unexpected regressions, can we please stick with an allocation free 
implementation?



##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -255,7 +256,7 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
     }
 
     // Get the respective epoch in which the starting-offset exists.
-    var maybeEpoch = leaderEpochCache.epochForOffset(startingOffset)
+    var maybeEpoch = leaderEpochCache.epochForOffset(startingOffset).asScala

Review Comment:
   Similarly here, can we just use `Optional` instead of converting to `Option`?



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -2650,7 +2647,7 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
 
     val leaderLog = partition.localLogOrException
-    assertEquals(Some(EpochEntry(leaderEpoch, 0L)), 
leaderLog.leaderEpochCache.flatMap(_.latestEntry))
+    assertEquals(Some(new EpochEntry(leaderEpoch, 0L)), 
leaderLog.leaderEpochCache.flatMap(_.latestEntry.asScala))

Review Comment:
   We can change the expected value to be `Optional.of` instead of converting 
the response with `asScala`.



##########
storage/src/main/java/org/apache/kafka/server/log/internals/EpochEntry.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Objects;
+
+// Mapping of epoch to the first offset of the subsequent epoch
+public class EpochEntry {
+    public final int epoch;
+    public final long startOffset;
+
+    public EpochEntry(int epoch, long startOffset) {
+        this.epoch = epoch;
+        this.startOffset = startOffset;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        EpochEntry that = (EpochEntry) o;
+        return epoch == that.epoch && startOffset == that.startOffset;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(epoch, startOffset);
+    }
+
+    @Override
+    public String toString() {
+        return "EpochEntry{" +
+                "epoch=" + epoch +
+                ", startOffset=" + startOffset +
+                '}';

Review Comment:
   We typically use `(` and `)` versus `{` and `}` in our `toString` 
implementations.



##########
storage/src/main/java/org/apache/kafka/server/log/internals/LeaderEpochCheckpointFile.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.server.common.CheckpointFile.EntryFormatter;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+/**
+ * This class persists a map of (LeaderEpoch => Offsets) to a file (for a 
certain replica)
+ * <p>
+ * The format in the LeaderEpoch checkpoint file is like this:
+ * -----checkpoint file begin------
+ * 0                <- LeaderEpochCheckpointFile.currentVersion
+ * 2                <- following entries size
+ * 0  1     <- the format is: leader_epoch(int32) start_offset(int64)
+ * 1  2
+ * -----checkpoint file end----------
+ */
+public class LeaderEpochCheckpointFile implements LeaderEpochCheckpoint {
+    private static final String LEADER_EPOCH_CHECKPOINT_FILENAME = 
"leader-epoch-checkpoint";
+    private static final Pattern WHITE_SPACES_PATTERN = 
Pattern.compile("\\s+");
+    private static final int CURRENT_VERSION = 0;
+
+    public static final Formatter FORMATTER = new Formatter();

Review Comment:
   Please have the public field(s) before the private ones.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -995,11 +994,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     }
   }
 
-  def latestEpoch: Option[Int] = leaderEpochCache.flatMap(_.latestEpoch)
+  def latestEpoch: Option[Int] = 
leaderEpochCache.flatMap(_.latestEpoch.asScala).map(Int.unbox(_))

Review Comment:
   Can we use `OptionalInt` here?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1286,31 +1286,31 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         // The first cached epoch usually corresponds to the log start offset, 
but we have to verify this since
         // it may not be true following a message format version bump as the 
epoch will not be available for
         // log entries written in the older format.
-        val earliestEpochEntry = leaderEpochCache.flatMap(_.earliestEntry)
+        val earliestEpochEntry = 
leaderEpochCache.flatMap(_.earliestEntry().asScala)

Review Comment:
   To avoid multiple conversions, I think we can simply use Optional here and 
in the lines below?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to