This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch KAFKA-15183 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit d3aaeae757da5b1a1ea737bd6da6dc5dd062bf58 Author: Colin P. McCabe <[email protected]> AuthorDate: Mon Jul 24 15:59:55 2023 -0700 Make sure sizeInBytes can be called after freeze in every case. fix test --- .../kafka/server/KRaftClusterTest.scala | 2 +- .../kafka/snapshot/FileRawSnapshotWriter.java | 14 +++++++++----- .../kafka/raft/KafkaRaftClientSnapshotTest.java | 22 +++++++++------------- .../kafka/snapshot/MockRawSnapshotWriter.java | 2 +- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 242c7548853..94e6118d7d4 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -1153,7 +1153,7 @@ class KRaftClusterTest { val controller = cluster.controllers().values().iterator().next() controller.controller.waitForReadyBrokers(3).get() TestUtils.retry(60000) { - val latch = QuorumControllerIntegrationTestUtils.pause(controller.asInstanceOf[QuorumController]) + val latch = QuorumControllerIntegrationTestUtils.pause(controller.controller.asInstanceOf[QuorumController]) Thread.sleep(1001) latch.countDown() assertEquals(0, controller.sharedServer.controllerServerMetrics.fencedBrokerCount()) diff --git a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java index badefd321ed..535c176c728 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java @@ -35,7 +35,7 @@ public final class FileRawSnapshotWriter implements RawSnapshotWriter { private final FileChannel channel; private final OffsetAndEpoch snapshotId; private final Optional<ReplicatedLog> replicatedLog; - private boolean frozen = false; + private long frozenSize; private FileRawSnapshotWriter( Path tempSnapshotPath, @@ -47,6 +47,7 @@ public final class FileRawSnapshotWriter implements RawSnapshotWriter { this.channel = channel; this.snapshotId = snapshotId; this.replicatedLog = replicatedLog; + this.frozenSize = -1L; } @Override @@ -56,6 +57,9 @@ public final class FileRawSnapshotWriter implements RawSnapshotWriter { @Override public long sizeInBytes() { + if (frozenSize >= 0) { + return frozenSize; + } try { return channel.size(); } catch (IOException e) { @@ -99,7 +103,7 @@ public final class FileRawSnapshotWriter implements RawSnapshotWriter { @Override public boolean isFrozen() { - return frozen; + return frozenSize >= 0; } @Override @@ -107,8 +111,8 @@ public final class FileRawSnapshotWriter implements RawSnapshotWriter { try { checkIfFrozen("Freeze"); + frozenSize = channel.size(); channel.close(); - frozen = true; if (!tempSnapshotPath.toFile().setReadOnly()) { throw new IllegalStateException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath)); @@ -148,12 +152,12 @@ public final class FileRawSnapshotWriter implements RawSnapshotWriter { "FileRawSnapshotWriter(path=%s, snapshotId=%s, frozen=%s)", tempSnapshotPath, snapshotId, - frozen + isFrozen() ); } void checkIfFrozen(String operation) { - if (frozen) { + if (isFrozen()) { throw new IllegalStateException( String.format( "%s is not supported. Snapshot is already frozen: id = %s; temp path = %s", diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index bc1d0ca21fc..f5b1ea15c3d 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -48,6 +48,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.OptionalInt; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -1832,12 +1833,12 @@ final public class KafkaRaftClientSnapshotTest { private final static class MemorySnapshotWriter implements RawSnapshotWriter { private final OffsetAndEpoch snapshotId; private ByteBuffer data; - private boolean frozen; + private AtomicLong frozenPosition; public MemorySnapshotWriter(OffsetAndEpoch snapshotId) { this.snapshotId = snapshotId; this.data = ByteBuffer.allocate(0); - this.frozen = false; + this.frozenPosition = new AtomicLong(-1L); } @Override @@ -1847,16 +1848,13 @@ final public class KafkaRaftClientSnapshotTest { @Override public long sizeInBytes() { - if (frozen) { - throw new RuntimeException("Snapshot is already frozen " + snapshotId); - } - - return data.position(); + long position = frozenPosition.get(); + return (position < 0) ? data.position() : position; } @Override public void append(UnalignedMemoryRecords records) { - if (frozen) { + if (isFrozen()) { throw new RuntimeException("Snapshot is already frozen " + snapshotId); } append(records.buffer()); @@ -1864,7 +1862,7 @@ final public class KafkaRaftClientSnapshotTest { @Override public void append(MemoryRecords records) { - if (frozen) { + if (isFrozen()) { throw new RuntimeException("Snapshot is already frozen " + snapshotId); } append(records.buffer()); @@ -1885,16 +1883,14 @@ final public class KafkaRaftClientSnapshotTest { @Override public boolean isFrozen() { - return frozen; + return frozenPosition.get() >= 0; } @Override public void freeze() { - if (frozen) { + if (!frozenPosition.compareAndSet(-1L, data.position())) { throw new RuntimeException("Snapshot is already frozen " + snapshotId); } - - frozen = true; data.flip(); } diff --git a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java index 0b5cc66a0c1..103eb3781c6 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java @@ -46,7 +46,7 @@ public final class MockRawSnapshotWriter implements RawSnapshotWriter { @Override public long sizeInBytes() { - ensureNotFrozenOrClosed(); + ensureOpen(); return data.position(); }
