This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b6e70e9a54f MINOR: Add test for PartitionMetadataFile (#15714) b6e70e9a54f is described below commit b6e70e9a54f9bafe49f33b56ef089601bef3db91 Author: Cheng-Kai, Zhang <kai821...@gmail.com> AuthorDate: Wed Apr 24 13:01:35 2024 +0800 MINOR: Add test for PartitionMetadataFile (#15714) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- checkstyle/import-control-storage.xml | 3 +- .../checkpoint/PartitionMetadataFileTest.java | 98 ++++++++++++++++++++++ 2 files changed, 100 insertions(+), 1 deletion(-) diff --git a/checkstyle/import-control-storage.xml b/checkstyle/import-control-storage.xml index 03a8219f15a..97294dd7c7e 100644 --- a/checkstyle/import-control-storage.xml +++ b/checkstyle/import-control-storage.xml @@ -73,9 +73,10 @@ <subpackage name="storage.internals"> <allow pkg="com.fasterxml.jackson" /> <allow pkg="com.yammer.metrics.core" /> + <allow pkg="org.apache.kafka.common" /> <allow pkg="org.apache.kafka.server"/> <allow pkg="org.apache.kafka.storage.internals"/> - <allow pkg="org.apache.kafka.common" /> + <allow pkg="org.apache.kafka.test" /> <allow pkg="com.github.benmanes.caffeine.cache" /> </subpackage> diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java new file mode 100644 index 00000000000..a20b82a0ade --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentTopicIdException; + +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class PartitionMetadataFileTest { + private final File dir = TestUtils.tempDirectory(); + private final File file = PartitionMetadataFile.newFile(dir); + + @Test + public void testSetRecordWithDifferentTopicId() { + PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); + Uuid topicId = Uuid.randomUuid(); + partitionMetadataFile.record(topicId); + Uuid differentTopicId = Uuid.randomUuid(); + assertThrows(InconsistentTopicIdException.class, () -> partitionMetadataFile.record(differentTopicId)); + } + + @Test + public void testSetRecordWithSameTopicId() { + PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); + Uuid topicId = Uuid.randomUuid(); + partitionMetadataFile.record(topicId); + assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); + + Uuid sameTopicId = Uuid.fromString(topicId.toString()); + partitionMetadataFile.record(sameTopicId); + assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); + } + + @Test + public void testMaybeFlushWithTopicIdPresent() throws IOException { + PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); + + Uuid topicId = Uuid.randomUuid(); + partitionMetadataFile.record(topicId); + partitionMetadataFile.maybeFlush(); + + // The following content is encoded by PartitionMetadata#encode, which is invoked during the flush + List<String> lines = Files.readAllLines(file.toPath()); + assertEquals(2, lines.size()); + assertEquals("version: 0", lines.get(0)); + assertEquals("topic_id: " + topicId, lines.get(1)); + } + + @Test + public void testMaybeFlushWithNoTopicIdPresent() { + PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); + partitionMetadataFile.maybeFlush(); + + assertEquals(0, file.length()); + } + + @Test + public void testRead() { + LogDirFailureChannel channel = Mockito.mock(LogDirFailureChannel.class); + PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, channel); + + Uuid topicId = Uuid.randomUuid(); + partitionMetadataFile.record(topicId); + partitionMetadataFile.maybeFlush(); + + PartitionMetadata metadata = partitionMetadataFile.read(); + assertEquals(0, metadata.version()); + assertEquals(topicId, metadata.topicId()); + } +} \ No newline at end of file