mimaison commented on code in PR #18039:
URL: https://github.com/apache/kafka/pull/18039#discussion_r1886624182
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -206,4 +223,119 @@ public static Optional<CompletedTxn>
updateProducers(ProducerStateManager produc
}
return completedTxn;
}
+
+ public static boolean isRemoteLogEnabled(boolean
remoteStorageSystemEnable, LogConfig config, String topic) {
+ // Remote log is enabled only for non-compact and non-internal topics
+ return remoteStorageSystemEnable &&
+ !(config.compact || Topic.isInternal(topic)
+ ||
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topic)
+ || Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topic)) &&
+ config.remoteStorageEnable();
+ }
+
+ // Visible for benchmarking
+ public static LogValidator.MetricsRecorder
newValidatorMetricsRecorder(BrokerTopicMetrics allTopicsStats) {
+ return new LogValidator.MetricsRecorder() {
+ public void recordInvalidMagic() {
+ allTopicsStats.invalidMagicNumberRecordsPerSec().mark();
+ }
+
+ public void recordInvalidOffset() {
+ allTopicsStats.invalidOffsetOrSequenceRecordsPerSec().mark();
+ }
+
+ public void recordInvalidSequence() {
+ allTopicsStats.invalidOffsetOrSequenceRecordsPerSec().mark();
+ }
+
+ public void recordInvalidChecksums() {
+ allTopicsStats.invalidMessageCrcRecordsPerSec().mark();
+ }
+
+ public void recordNoKeyCompactedTopic() {
+ allTopicsStats.noKeyCompactedTopicRecordsPerSec().mark();
+ }
+ };
+ }
+
+ /**
+ * If the recordVersion is >= RecordVersion.V2, create a new
LeaderEpochFileCache instance.
+ * Loading the epoch entries from the backing checkpoint file or the
provided currentCache if not empty.
+ * Otherwise, the message format is considered incompatible and the
existing LeaderEpoch file
+ * is deleted.
+ *
+ * @param dir The directory in which the log will reside
+ * @param topicPartition The topic partition
+ * @param logDirFailureChannel The LogDirFailureChannel to asynchronously
handle log dir failure
+ * @param recordVersion The record version
+ * @param logPrefix The logging prefix
+ * @param currentCache The current LeaderEpochFileCache instance
(if any)
+ * @param scheduler The scheduler for executing asynchronous
tasks
+ * @return The new LeaderEpochFileCache instance (if created), empty
otherwise
+ */
+ public static Optional<LeaderEpochFileCache>
maybeCreateLeaderEpochCache(File dir,
+
TopicPartition topicPartition,
+
LogDirFailureChannel logDirFailureChannel,
+
RecordVersion recordVersion,
+
String logPrefix,
+
Optional<LeaderEpochFileCache> currentCache,
+
Scheduler scheduler) throws IOException {
+ File leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir);
+
+ if (recordVersion.precedes(RecordVersion.V2)) {
+ if (leaderEpochFile.exists()) {
+ LOG.warn(logPrefix + "Deleting non-empty leader epoch cache
due to incompatible message format " + recordVersion);
Review Comment:
Good catch! It's not the first time I miss these. For some reasons I had the
inspection for this disabled in IntelliJ.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]