This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch KAFKA-15183
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d3aaeae757da5b1a1ea737bd6da6dc5dd062bf58
Author: Colin P. McCabe <[email protected]>
AuthorDate: Mon Jul 24 15:59:55 2023 -0700

    Make sure sizeInBytes can be called after freeze in every case. fix test
---
 .../kafka/server/KRaftClusterTest.scala            |  2 +-
 .../kafka/snapshot/FileRawSnapshotWriter.java      | 14 +++++++++-----
 .../kafka/raft/KafkaRaftClientSnapshotTest.java    | 22 +++++++++-------------
 .../kafka/snapshot/MockRawSnapshotWriter.java      |  2 +-
 4 files changed, 20 insertions(+), 20 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 242c7548853..94e6118d7d4 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1153,7 +1153,7 @@ class KRaftClusterTest {
       val controller = cluster.controllers().values().iterator().next()
       controller.controller.waitForReadyBrokers(3).get()
       TestUtils.retry(60000) {
-        val latch = 
QuorumControllerIntegrationTestUtils.pause(controller.asInstanceOf[QuorumController])
+        val latch = 
QuorumControllerIntegrationTestUtils.pause(controller.controller.asInstanceOf[QuorumController])
         Thread.sleep(1001)
         latch.countDown()
         assertEquals(0, 
controller.sharedServer.controllerServerMetrics.fencedBrokerCount())
diff --git 
a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java 
b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
index badefd321ed..535c176c728 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
@@ -35,7 +35,7 @@ public final class FileRawSnapshotWriter implements 
RawSnapshotWriter {
     private final FileChannel channel;
     private final OffsetAndEpoch snapshotId;
     private final Optional<ReplicatedLog> replicatedLog;
-    private boolean frozen = false;
+    private long frozenSize;
 
     private FileRawSnapshotWriter(
         Path tempSnapshotPath,
@@ -47,6 +47,7 @@ public final class FileRawSnapshotWriter implements 
RawSnapshotWriter {
         this.channel = channel;
         this.snapshotId = snapshotId;
         this.replicatedLog = replicatedLog;
+        this.frozenSize = -1L;
     }
 
     @Override
@@ -56,6 +57,9 @@ public final class FileRawSnapshotWriter implements 
RawSnapshotWriter {
 
     @Override
     public long sizeInBytes() {
+        if (frozenSize >= 0) {
+            return frozenSize;
+        }
         try {
             return channel.size();
         } catch (IOException e) {
@@ -99,7 +103,7 @@ public final class FileRawSnapshotWriter implements 
RawSnapshotWriter {
 
     @Override
     public boolean isFrozen() {
-        return frozen;
+        return frozenSize >= 0;
     }
 
     @Override
@@ -107,8 +111,8 @@ public final class FileRawSnapshotWriter implements 
RawSnapshotWriter {
         try {
             checkIfFrozen("Freeze");
 
+            frozenSize = channel.size();
             channel.close();
-            frozen = true;
 
             if (!tempSnapshotPath.toFile().setReadOnly()) {
                 throw new IllegalStateException(String.format("Unable to set 
file (%s) as read-only", tempSnapshotPath));
@@ -148,12 +152,12 @@ public final class FileRawSnapshotWriter implements 
RawSnapshotWriter {
             "FileRawSnapshotWriter(path=%s, snapshotId=%s, frozen=%s)",
             tempSnapshotPath,
             snapshotId,
-            frozen
+            isFrozen()
         );
     }
 
     void checkIfFrozen(String operation) {
-        if (frozen) {
+        if (isFrozen()) {
             throw new IllegalStateException(
                 String.format(
                     "%s is not supported. Snapshot is already frozen: id = %s; 
temp path = %s",
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index bc1d0ca21fc..f5b1ea15c3d 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -48,6 +48,7 @@ import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.OptionalInt;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -1832,12 +1833,12 @@ final public class KafkaRaftClientSnapshotTest {
     private final static class MemorySnapshotWriter implements 
RawSnapshotWriter {
         private final OffsetAndEpoch snapshotId;
         private ByteBuffer data;
-        private boolean frozen;
+        private AtomicLong frozenPosition;
 
         public MemorySnapshotWriter(OffsetAndEpoch snapshotId) {
             this.snapshotId = snapshotId;
             this.data = ByteBuffer.allocate(0);
-            this.frozen = false;
+            this.frozenPosition = new AtomicLong(-1L);
         }
 
         @Override
@@ -1847,16 +1848,13 @@ final public class KafkaRaftClientSnapshotTest {
 
         @Override
         public long sizeInBytes() {
-            if (frozen) {
-                throw new RuntimeException("Snapshot is already frozen " + 
snapshotId);
-            }
-
-            return data.position();
+            long position = frozenPosition.get();
+            return (position < 0) ? data.position() : position;
         }
 
         @Override
         public void append(UnalignedMemoryRecords records) {
-            if (frozen) {
+            if (isFrozen()) {
                 throw new RuntimeException("Snapshot is already frozen " + 
snapshotId);
             }
             append(records.buffer());
@@ -1864,7 +1862,7 @@ final public class KafkaRaftClientSnapshotTest {
 
         @Override
         public void append(MemoryRecords records) {
-            if (frozen) {
+            if (isFrozen()) {
                 throw new RuntimeException("Snapshot is already frozen " + 
snapshotId);
             }
             append(records.buffer());
@@ -1885,16 +1883,14 @@ final public class KafkaRaftClientSnapshotTest {
 
         @Override
         public boolean isFrozen() {
-            return frozen;
+            return frozenPosition.get() >= 0;
         }
 
         @Override
         public void freeze() {
-            if (frozen) {
+            if (!frozenPosition.compareAndSet(-1L, data.position())) {
                 throw new RuntimeException("Snapshot is already frozen " + 
snapshotId);
             }
-
-            frozen = true;
             data.flip();
         }
 
diff --git 
a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java 
b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
index 0b5cc66a0c1..103eb3781c6 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
@@ -46,7 +46,7 @@ public final class MockRawSnapshotWriter implements 
RawSnapshotWriter {
 
     @Override
     public long sizeInBytes() {
-        ensureNotFrozenOrClosed();
+        ensureOpen();
         return data.position();
     }
 

Reply via email to