chia7712 commented on code in PR #18039:
URL: https://github.com/apache/kafka/pull/18039#discussion_r1886748240


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -206,4 +223,119 @@ public static Optional<CompletedTxn> 
updateProducers(ProducerStateManager produc
         }
         return completedTxn;
     }
+
+    public static boolean isRemoteLogEnabled(boolean 
remoteStorageSystemEnable, LogConfig config, String topic) {
+        // Remote log is enabled only for non-compact and non-internal topics
+        return remoteStorageSystemEnable &&
+                !(config.compact || Topic.isInternal(topic)
+                        || 
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topic)
+                        || Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topic)) &&
+                config.remoteStorageEnable();
+    }
+
+    // Visible for benchmarking
+    public static LogValidator.MetricsRecorder 
newValidatorMetricsRecorder(BrokerTopicMetrics allTopicsStats) {
+        return new LogValidator.MetricsRecorder() {
+            public void recordInvalidMagic() {
+                allTopicsStats.invalidMagicNumberRecordsPerSec().mark();
+            }
+
+            public void recordInvalidOffset() {
+                allTopicsStats.invalidOffsetOrSequenceRecordsPerSec().mark();
+            }
+
+            public void recordInvalidSequence() {
+                allTopicsStats.invalidOffsetOrSequenceRecordsPerSec().mark();
+            }
+
+            public void recordInvalidChecksums() {
+                allTopicsStats.invalidMessageCrcRecordsPerSec().mark();
+            }
+
+            public void recordNoKeyCompactedTopic() {
+                allTopicsStats.noKeyCompactedTopicRecordsPerSec().mark();
+            }
+        };
+    }
+
+    /**
+     * If the recordVersion is >= RecordVersion.V2, create a new 
LeaderEpochFileCache instance.
+     * Loading the epoch entries from the backing checkpoint file or the 
provided currentCache if not empty.
+     * Otherwise, the message format is considered incompatible and the 
existing LeaderEpoch file
+     * is deleted.
+     *
+     * @param dir                  The directory in which the log will reside
+     * @param topicPartition       The topic partition
+     * @param logDirFailureChannel The LogDirFailureChannel to asynchronously 
handle log dir failure
+     * @param recordVersion        The record version
+     * @param logPrefix            The logging prefix
+     * @param currentCache         The current LeaderEpochFileCache instance 
(if any)
+     * @param scheduler            The scheduler for executing asynchronous 
tasks
+     * @return The new LeaderEpochFileCache instance (if created), empty 
otherwise
+     */
+    public static Optional<LeaderEpochFileCache> 
maybeCreateLeaderEpochCache(File dir,
+                                                                             
TopicPartition topicPartition,
+                                                                             
LogDirFailureChannel logDirFailureChannel,
+                                                                             
RecordVersion recordVersion,
+                                                                             
String logPrefix,
+                                                                             
Optional<LeaderEpochFileCache> currentCache,
+                                                                             
Scheduler scheduler) throws IOException {
+        File leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir);
+
+        if (recordVersion.precedes(RecordVersion.V2)) {
+            if (leaderEpochFile.exists()) {
+                LOG.warn(logPrefix + "Deleting non-empty leader epoch cache 
due to incompatible message format " + recordVersion);
+            }
+            Files.deleteIfExists(leaderEpochFile.toPath());
+            return Optional.empty();
+        } else {
+            LeaderEpochCheckpointFile checkpointFile = new 
LeaderEpochCheckpointFile(leaderEpochFile, logDirFailureChannel);
+            return Optional.of(currentCache.map(cache -> 
cache.withCheckpoint(checkpointFile))
+                    .orElse(new LeaderEpochFileCache(topicPartition, 
checkpointFile, scheduler)));
+        }
+    }
+
+    public static LogSegment createNewCleanedSegment(File dir, LogConfig 
logConfig, long baseOffset) throws IOException {

Review Comment:
   Thanks for the documentation. It's good to know! :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to