This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 121ec2a662f KAFKA-15599 Move MetadataLogConfig to raft module (#19246)
121ec2a662f is described below
commit 121ec2a662fa74ee59899f87a673e5b3d957cde5
Author: Mickael Maison <[email protected]>
AuthorDate: Fri Mar 21 06:44:20 2025 +0100
KAFKA-15599 Move MetadataLogConfig to raft module (#19246)
Rewrite the class in Java and move it to the raft module.
Reviewers: PoAn Yang <[email protected]>, TengYao Chi
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
checkstyle/import-control.xml | 1 +
core/src/main/scala/kafka/MetadataLogConfig.scala | 48 -------
.../main/scala/kafka/raft/KafkaMetadataLog.scala | 4 +-
core/src/main/scala/kafka/raft/RaftManager.scala | 13 +-
.../scala/kafka/raft/KafkaMetadataLogTest.scala | 139 +++++++++++++--------
.../unit/kafka/tools/DumpLogSegmentsTest.scala | 24 ++--
.../org/apache/kafka/raft/MetadataLogConfig.java | 40 ++++++
7 files changed, 154 insertions(+), 115 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 37e08d3309e..318c1721928 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -488,6 +488,7 @@
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.common.serialization" />
+ <allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault"/>
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.test"/>
diff --git a/core/src/main/scala/kafka/MetadataLogConfig.scala
b/core/src/main/scala/kafka/MetadataLogConfig.scala
deleted file mode 100755
index 20b5b23539e..00000000000
--- a/core/src/main/scala/kafka/MetadataLogConfig.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.raft
-
-import org.apache.kafka.server.config.ServerLogConfigs
-import kafka.server.KafkaConfig
-
-final case class MetadataLogConfig(
- logSegmentBytes: Int,
- logSegmentMinBytes: Int,
- logSegmentMillis: Long,
- retentionMaxBytes: Long,
- retentionMillis: Long,
- maxBatchSizeInBytes: Int,
- maxFetchSizeInBytes: Int,
- fileDeleteDelayMs: Long,
- nodeId: Int
-)
-
-object MetadataLogConfig {
- def apply(config: KafkaConfig, maxBatchSizeInBytes: Int,
maxFetchSizeInBytes: Int): MetadataLogConfig = {
- new MetadataLogConfig(
- config.metadataLogSegmentBytes,
- config.metadataLogSegmentMinBytes,
- config.metadataLogSegmentMillis,
- config.metadataRetentionBytes,
- config.metadataRetentionMillis,
- maxBatchSizeInBytes,
- maxFetchSizeInBytes,
- ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
- config.metadataNodeIDConfig
- )
- }
-}
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 1d1f18a1392..ebc2f3c81cf 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
-import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo,
LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog,
SegmentPosition, ValidOffsetAndEpoch}
+import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo,
LogFetchInfo, LogOffsetMetadata, MetadataLogConfig, OffsetAndEpoch,
OffsetMetadata, ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.Scheduler
@@ -554,7 +554,7 @@ final class KafkaMetadataLog private (
scheduler.scheduleOnce(
"delete-snapshot-files",
() => KafkaMetadataLog.deleteSnapshotFiles(log.dir.toPath,
expiredSnapshots, this),
- config.fileDeleteDelayMs
+ config.deleteDelayMillis
)
}
}
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala
b/core/src/main/scala/kafka/raft/RaftManager.scala
index e55e91eaaef..522c37c0829 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -41,10 +41,11 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
-import org.apache.kafka.raft.{ExternalKRaftMetrics, Endpoints,
FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient,
KafkaRaftClientDriver, LeaderAndEpoch, QuorumConfig, RaftClient, ReplicatedLog,
TimingWheelExpirationService}
+import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics,
FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient,
KafkaRaftClientDriver, LeaderAndEpoch, MetadataLogConfig, QuorumConfig,
RaftClient, ReplicatedLog, TimingWheelExpirationService}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.common.Feature
import org.apache.kafka.server.common.serialization.RecordSerde
+import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.util.{FileLock, KafkaScheduler}
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.server.util.timer.SystemTimer
@@ -230,7 +231,15 @@ class KafkaRaftManager[T](
dataDir,
time,
scheduler,
- config = MetadataLogConfig(config, KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
+ config = new MetadataLogConfig(config.metadataLogSegmentBytes,
+ config.metadataLogSegmentMinBytes,
+ config.metadataLogSegmentMillis,
+ config.metadataRetentionBytes,
+ config.metadataRetentionMillis,
+ KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+ KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+ ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
+ config.metadataNodeIDConfig)
)
}
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 6bc49be315b..466fb954d13 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.record.ArbitraryMemoryRecords
import org.apache.kafka.common.record.InvalidMemoryRecordsProvider
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.raft._
+import org.apache.kafka.raft.{KafkaRaftClient, LogAppendInfo,
LogOffsetMetadata, MetadataLogConfig, OffsetAndEpoch, QuorumConfig,
ReplicatedLog, SegmentPosition, ValidOffsetAndEpoch}
import org.apache.kafka.raft.internals.BatchBuilder
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
@@ -80,13 +80,31 @@ final class KafkaMetadataLogTest {
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MILLIS_CONFIG, Int.box(10 *
1024))
assertThrows(classOf[InvalidConfigurationException], () => {
val kafkaConfig = KafkaConfig.fromProps(props)
- val metadataConfig = MetadataLogConfig(kafkaConfig,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
+ val metadataConfig = new MetadataLogConfig(
+ kafkaConfig.metadataLogSegmentBytes,
+ kafkaConfig.metadataLogSegmentMinBytes,
+ kafkaConfig.metadataLogSegmentMillis,
+ kafkaConfig.metadataRetentionBytes,
+ kafkaConfig.metadataRetentionMillis,
+ KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+ KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+ ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
+ kafkaConfig.metadataNodeIDConfig)
buildMetadataLog(tempDir, mockTime, metadataConfig)
})
props.put(KRaftConfigs.METADATA_LOG_SEGMENT_MIN_BYTES_CONFIG,
Int.box(10240))
val kafkaConfig = KafkaConfig.fromProps(props)
- val metadataConfig = MetadataLogConfig(kafkaConfig,
KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
+ val metadataConfig = new MetadataLogConfig(
+ kafkaConfig.metadataLogSegmentBytes,
+ kafkaConfig.metadataLogSegmentMinBytes,
+ kafkaConfig.metadataLogSegmentMillis,
+ kafkaConfig.metadataRetentionBytes,
+ kafkaConfig.metadataRetentionMillis,
+ KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+ KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+ ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
+ kafkaConfig.metadataNodeIDConfig)
buildMetadataLog(tempDir, mockTime, metadataConfig)
}
@@ -129,8 +147,8 @@ final class KafkaMetadataLogTest {
def testEmptyAppendNotAllowed(): Unit = {
val log = buildMetadataLog(tempDir, mockTime)
- assertThrows(classOf[IllegalArgumentException], () =>
log.appendAsFollower(MemoryRecords.EMPTY, 1));
- assertThrows(classOf[IllegalArgumentException], () =>
log.appendAsLeader(MemoryRecords.EMPTY, 1));
+ assertThrows(classOf[IllegalArgumentException], () =>
log.appendAsFollower(MemoryRecords.EMPTY, 1))
+ assertThrows(classOf[IllegalArgumentException], () =>
log.appendAsLeader(MemoryRecords.EMPTY, 1))
}
@ParameterizedTest
@@ -140,7 +158,7 @@ final class KafkaMetadataLogTest {
val previousEndOffset = log.endOffset().offset()
val action: Executable = () => log.appendAsFollower(records, Int.MaxValue)
- if (expectedException.isPresent()) {
+ if (expectedException.isPresent) {
assertThrows(expectedException.get, action)
} else {
assertThrows(classOf[CorruptRecordException], action)
@@ -478,7 +496,7 @@ final class KafkaMetadataLogTest {
assertEquals(log.earliestSnapshotId(), log.latestSnapshotId())
log.close()
- mockTime.sleep(config.fileDeleteDelayMs)
+ mockTime.sleep(config.deleteDelayMillis)
// Assert that the log dir doesn't contain any older snapshots
Files
.walk(logDir, 1)
@@ -649,7 +667,7 @@ final class KafkaMetadataLogTest {
assertEquals(greaterSnapshotId, secondLog.latestSnapshotId().get)
assertEquals(3 * numberOfRecords, secondLog.startOffset)
assertEquals(epoch, secondLog.lastFetchedEpoch)
- mockTime.sleep(config.fileDeleteDelayMs)
+ mockTime.sleep(config.deleteDelayMillis)
// Assert that the log dir doesn't contain any older snapshots
Files
@@ -687,7 +705,18 @@ final class KafkaMetadataLogTest {
val leaderEpoch = 5
val maxBatchSizeInBytes = 16384
val recordSize = 64
- val log = buildMetadataLog(tempDir, mockTime,
DefaultMetadataLogConfig.copy(maxBatchSizeInBytes = maxBatchSizeInBytes))
+ val config = new MetadataLogConfig(
+ DefaultMetadataLogConfig.logSegmentBytes,
+ DefaultMetadataLogConfig.logSegmentMinBytes,
+ DefaultMetadataLogConfig.logSegmentMillis,
+ DefaultMetadataLogConfig.retentionMaxBytes,
+ DefaultMetadataLogConfig.retentionMillis,
+ maxBatchSizeInBytes,
+ DefaultMetadataLogConfig.maxFetchSizeInBytes,
+ DefaultMetadataLogConfig.deleteDelayMillis,
+ DefaultMetadataLogConfig.nodeId
+ )
+ val log = buildMetadataLog(tempDir, mockTime, config)
val oversizeBatch = buildFullBatch(leaderEpoch, recordSize,
maxBatchSizeInBytes + recordSize)
assertThrows(classOf[RecordTooLargeException], () => {
@@ -897,18 +926,17 @@ final class KafkaMetadataLogTest {
@Test
def testAdvanceLogStartOffsetAfterCleaning(): Unit = {
- val config = MetadataLogConfig(
- logSegmentBytes = 512,
- logSegmentMinBytes = 512,
- logSegmentMillis = 10 * 1000,
- retentionMaxBytes = 256,
- retentionMillis = 60 * 1000,
- maxBatchSizeInBytes = 512,
- maxFetchSizeInBytes = DefaultMetadataLogConfig.maxFetchSizeInBytes,
- fileDeleteDelayMs = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
- nodeId = 1
+ val config = new MetadataLogConfig(
+ 512,
+ 512,
+ 10 * 1000,
+ 256,
+ 60 * 1000,
+ 512,
+ DefaultMetadataLogConfig.maxFetchSizeInBytes,
+ ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
+ 1
)
- config.copy()
val log = buildMetadataLog(tempDir, mockTime, config)
// Generate some segments
@@ -936,13 +964,16 @@ final class KafkaMetadataLogTest {
@Test
def testDeleteSnapshots(): Unit = {
// Generate some logs and a few snapshots, set retention low and verify
that cleaning occurs
- val config = DefaultMetadataLogConfig.copy(
- logSegmentBytes = 1024,
- logSegmentMinBytes = 1024,
- logSegmentMillis = 10 * 1000,
- retentionMaxBytes = 1024,
- retentionMillis = 60 * 1000,
- maxBatchSizeInBytes = 100
+ val config = new MetadataLogConfig(
+ 1024,
+ 1024,
+ 10 * 1000,
+ 1024,
+ 60 * 1000,
+ 100,
+ DefaultMetadataLogConfig.maxBatchSizeInBytes,
+ DefaultMetadataLogConfig.maxFetchSizeInBytes,
+ DefaultMetadataLogConfig.nodeId
)
val log = buildMetadataLog(tempDir, mockTime, config)
@@ -968,13 +999,16 @@ final class KafkaMetadataLogTest {
@Test
def testSoftRetentionLimit(): Unit = {
// Set retention equal to the segment size and generate slightly more than
one segment of logs
- val config = DefaultMetadataLogConfig.copy(
- logSegmentBytes = 10240,
- logSegmentMinBytes = 10240,
- logSegmentMillis = 10 * 1000,
- retentionMaxBytes = 10240,
- retentionMillis = 60 * 1000,
- maxBatchSizeInBytes = 100
+ val config = new MetadataLogConfig(
+ 10240,
+ 10240,
+ 10 * 1000,
+ 10240,
+ 60 * 1000,
+ 100,
+ DefaultMetadataLogConfig.maxFetchSizeInBytes,
+ DefaultMetadataLogConfig.deleteDelayMillis,
+ DefaultMetadataLogConfig.nodeId
)
val log = buildMetadataLog(tempDir, mockTime, config)
@@ -1010,13 +1044,16 @@ final class KafkaMetadataLogTest {
@Test
def testSegmentsLessThanLatestSnapshot(): Unit = {
- val config = DefaultMetadataLogConfig.copy(
- logSegmentBytes = 10240,
- logSegmentMinBytes = 10240,
- logSegmentMillis = 10 * 1000,
- retentionMaxBytes = 10240,
- retentionMillis = 60 * 1000,
- maxBatchSizeInBytes = 200
+ val config = new MetadataLogConfig(
+ 10240,
+ 10240,
+ 10 * 1000,
+ 10240,
+ 60 * 1000,
+ 200,
+ DefaultMetadataLogConfig.maxFetchSizeInBytes,
+ DefaultMetadataLogConfig.deleteDelayMillis,
+ DefaultMetadataLogConfig.nodeId
)
val log = buildMetadataLog(tempDir, mockTime, config)
@@ -1067,16 +1104,16 @@ object KafkaMetadataLogTest {
override def read(input: protocol.Readable, size: Int): Array[Byte] =
input.readArray(size)
}
- val DefaultMetadataLogConfig = MetadataLogConfig(
- logSegmentBytes = 100 * 1024,
- logSegmentMinBytes = 100 * 1024,
- logSegmentMillis = 10 * 1000,
- retentionMaxBytes = 100 * 1024,
- retentionMillis = 60 * 1000,
- maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
- maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
- fileDeleteDelayMs = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
- nodeId = 1
+ val DefaultMetadataLogConfig = new MetadataLogConfig(
+ 100 * 1024,
+ 100 * 1024,
+ 10 * 1000,
+ 100 * 1024,
+ 60 * 1000,
+ KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+ KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+ ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
+ 1
)
def buildMetadataLogAndDir(
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index f295a381a52..11d33787dd6 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -25,7 +25,7 @@ import java.util.Optional
import java.util.Properties
import java.util.stream.IntStream
import kafka.log.LogTestUtils
-import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
+import kafka.raft.KafkaMetadataLog
import kafka.server.KafkaRaftServer
import kafka.tools.DumpLogSegments.{OffsetsMessageParser,
ShareGroupStateMessageParser, TimeIndexDumpErrors, TransactionLogMessageParser}
import kafka.utils.TestUtils
@@ -43,7 +43,7 @@ import
org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnap
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey,
TransactionLogValue}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.metadata.MetadataRecordSerde
-import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch, VoterSetTest}
+import org.apache.kafka.raft.{KafkaRaftClient, MetadataLogConfig,
OffsetAndEpoch, VoterSetTest}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion}
import org.apache.kafka.server.config.ServerLogConfigs
import
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde
@@ -544,16 +544,16 @@ class DumpLogSegmentsTest {
logDir,
time,
time.scheduler,
- MetadataLogConfig(
- logSegmentBytes = 100 * 1024,
- logSegmentMinBytes = 100 * 1024,
- logSegmentMillis = 10 * 1000,
- retentionMaxBytes = 100 * 1024,
- retentionMillis = 60 * 1000,
- maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
- maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
- fileDeleteDelayMs = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
- nodeId = 1
+ new MetadataLogConfig(
+ 100 * 1024,
+ 100 * 1024,
+ 10 * 1000,
+ 100 * 1024,
+ 60 * 1000,
+ KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+ KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+ ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT,
+ 1
)
)
diff --git a/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
b/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
new file mode 100644
index 00000000000..869966e1791
--- /dev/null
+++ b/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+/**
+ * Configuration for the metadata log
+ * @param logSegmentBytes The maximum size of a single metadata log file
+ * @param logSegmentMinBytes The minimum size of a single metadata log file
+ * @param logSegmentMillis The maximum time before a new metadata log file is
rolled out
+ * @param retentionMaxBytes The size of the metadata log and snapshots before
deleting old snapshots and log files
+ * @param retentionMillis The time to keep a metadata log file or snapshot
before deleting it
+ * @param maxBatchSizeInBytes The largest record batch size allowed in the
metadata log
+ * @param maxFetchSizeInBytes The maximum number of bytes to read when
fetching from the metadata log
+ * @param deleteDelayMillis The amount of time to wait before deleting a file
from the filesystem
+ * @param nodeId The node id
+ */
+public record MetadataLogConfig(int logSegmentBytes,
+ int logSegmentMinBytes,
+ long logSegmentMillis,
+ long retentionMaxBytes,
+ long retentionMillis,
+ int maxBatchSizeInBytes,
+ int maxFetchSizeInBytes,
+ long deleteDelayMillis,
+ int nodeId) {
+}