This is an automated email from the ASF dual-hosted git repository. jolshan pushed a commit to branch KAFKA-15626 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 7bbf82b17e30f6b824a6fdd14bb19c9c5fb30a1a Author: Justine <jols...@confluent.io> AuthorDate: Wed Oct 18 10:27:54 2023 -0700 Add sentinel and usage --- core/src/main/scala/kafka/cluster/Partition.scala | 5 +-- core/src/main/scala/kafka/log/UnifiedLog.scala | 17 +++++----- .../main/scala/kafka/server/ReplicaManager.scala | 5 +-- .../scala/unit/kafka/cluster/PartitionTest.scala | 6 ++-- .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 33 +++++++++++--------- .../unit/kafka/log/VerificationGuardTest.scala | 36 ++++++++++++++++++++++ .../unit/kafka/server/ReplicaManagerTest.scala | 16 +++++----- .../storage/internals/log/VerificationGuard.java | 14 ++++++++- 8 files changed, 93 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index d89dddd5c0e..fb66c4649aa 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -581,7 +581,8 @@ class Partition(val topicPartition: TopicPartition, } } - // Returns a VerificationGuard if we need to verify. This starts or continues the verification process. Otherwise return null. + // Returns a VerificationGuard if we need to verify. This starts or continues the verification process. Otherwise return the + // sentinel verification guard. def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): VerificationGuard = { leaderLogIfLocal match { case Some(log) => log.maybeStartTransactionVerification(producerId, sequence, epoch) @@ -1301,7 +1302,7 @@ class Partition(val topicPartition: TopicPartition, } def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int, - requestLocal: RequestLocal, verificationGuard: VerificationGuard = null): LogAppendInfo = { + requestLocal: RequestLocal, verificationGuard: VerificationGuard = VerificationGuard.SENTINEL_VERIFICATION_GUARD): LogAppendInfo = { val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) { leaderLogIfLocal match { case Some(leaderLog) => diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index 2895ae71fac..683260d74a9 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -600,11 +600,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Maybe create and return the VerificationGuard for the given producer ID if the transaction is not yet ongoing. - * Creation starts the verification process. Otherwise return null. + * Creation starts the verification process. Otherwise return the Sentinel VerificationGuard. */ def maybeStartTransactionVerification(producerId: Long, sequence: Int, epoch: Short): VerificationGuard = lock synchronized { if (hasOngoingTransaction(producerId)) - null + VerificationGuard.SENTINEL_VERIFICATION_GUARD else maybeCreateVerificationGuard(producerId, sequence, epoch) } @@ -619,11 +619,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, } /** - * If an VerificationStateEntry is present for the given producer ID, return its VerificationGuard, otherwise, return null. + * If an VerificationStateEntry is present for the given producer ID, return its VerificationGuard, otherwise, return the + * sentinel VerificationGuard. */ def verificationGuard(producerId: Long): VerificationGuard = lock synchronized { val entry = producerStateManager.verificationStateEntry(producerId) - if (entry != null) entry.verificationGuard else null + if (entry != null) entry.verificationGuard else VerificationGuard.SENTINEL_VERIFICATION_GUARD } /** @@ -715,7 +716,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, origin: AppendOrigin = AppendOrigin.CLIENT, interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest, requestLocal: RequestLocal = RequestLocal.NoCaching, - verificationGuard: VerificationGuard = null): LogAppendInfo = { + verificationGuard: VerificationGuard = VerificationGuard.SENTINEL_VERIFICATION_GUARD): LogAppendInfo = { val validateAndAssignOffsets = origin != AppendOrigin.RAFT_LEADER append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), verificationGuard, ignoreRecordSize = false) } @@ -734,7 +735,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validateAndAssignOffsets = false, leaderEpoch = -1, requestLocal = None, - verificationGuard = null, + verificationGuard = VerificationGuard.SENTINEL_VERIFICATION_GUARD, // disable to check the validation of record size since the record is already accepted by leader. ignoreRecordSize = true) } @@ -1059,7 +1060,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, // 2. Subsequent appends: Once we write to the transaction, the in-memory state currentTxnFirstOffset is populated. This field remains until the // transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still // ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and - // requestVerificationGuard is null, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1. + // requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1. if (batch.isTransactional && !hasOngoingTransaction(batch.producerId) && batchMissingRequiredVerification(batch, requestVerificationGuard)) throw new InvalidTxnStateException("Record was not part of an ongoing transaction") } @@ -1082,7 +1083,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: VerificationGuard): Boolean = { producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() && !batch.isControlBatch && - (requestVerificationGuard == null || !requestVerificationGuard.equals(verificationGuard(batch.producerId))) + !verificationGuard(batch.producerId).verifiedBy(requestVerificationGuard) } /** diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index f02745c2ecc..73064220f9e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -880,7 +880,7 @@ class ReplicaManager(val config: KafkaConfig, // We return VerificationGuard if the partition needs to be verified. If no state is present, no need to verify. val firstBatch = records.firstBatch val verificationGuard = getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId, firstBatch.baseSequence, firstBatch.producerEpoch) - if (verificationGuard != null) { + if (verificationGuard != VerificationGuard.SENTINEL_VERIFICATION_GUARD) { verificationGuards.put(topicPartition, verificationGuard) unverifiedEntries.put(topicPartition, records) } else @@ -1183,7 +1183,8 @@ class ReplicaManager(val config: KafkaConfig, } else { try { val partition = getPartitionOrException(topicPartition) - val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal, verificationGuards.getOrElse(topicPartition, null)) + val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal, + verificationGuards.getOrElse(topicPartition, VerificationGuard.SENTINEL_VERIFICATION_GUARD)) val numAppendedMessages = info.numMessages // update stats for successfully appended bytes and messages as bytesInRate and messageInRate diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 17b40f30b81..b571298222c 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -3549,9 +3549,9 @@ class PartitionTest extends AbstractPartitionTest { // When VerificationGuard is not there, we should not be able to append. assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching)) - // Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-null VerificationGuard. + // Before appendRecordsToLeader is called, ReplicaManager will call maybeStartTransactionVerification. We should get a non-sentinel VerificationGuard. val verificationGuard = partition.maybeStartTransactionVerification(producerId, 3, 0) - assertNotNull(verificationGuard) + assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard) // With the wrong VerificationGuard, append should fail. assertThrows(classOf[InvalidTxnStateException], () => partition.appendRecordsToLeader(transactionRecords(), @@ -3564,7 +3564,7 @@ class PartitionTest extends AbstractPartitionTest { // We should no longer need a VerificationGuard. Future appends without VerificationGuard will also succeed. val verificationGuard3 = partition.maybeStartTransactionVerification(producerId, 3, 0) - assertNull(verificationGuard3) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard3) partition.appendRecordsToLeader(transactionRecords(), origin = AppendOrigin.CLIENT, requiredAcks = 1, RequestLocal.withThreadConfinedCaching) } diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index b01c5af0217..f6640f5e0cc 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -36,7 +36,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler} import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException} +import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest @@ -3738,7 +3738,8 @@ class UnifiedLogTest { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig) assertFalse(log.hasOngoingTransaction(producerId)) - assertNull(log.verificationGuard(producerId)) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId)) + assertFalse(log.verificationGuard(producerId).verifiedBy(VerificationGuard.SENTINEL_VERIFICATION_GUARD)) val idempotentRecords = MemoryRecords.withIdempotentRecords( CompressionType.NONE, @@ -3763,7 +3764,7 @@ class UnifiedLogTest { ) val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch) - assertNotNull(verificationGuard) + assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard) log.appendAsLeader(idempotentRecords, origin = appendOrigin, leaderEpoch = 0) assertFalse(log.hasOngoingTransaction(producerId)) @@ -3772,13 +3773,14 @@ class UnifiedLogTest { assertEquals(verificationGuard, log.verificationGuard(producerId)) // Now write the transactional records + assertTrue(log.verificationGuard(producerId).verifiedBy(verificationGuard)) log.appendAsLeader(transactionalRecords, origin = appendOrigin, leaderEpoch = 0, verificationGuard = verificationGuard) assertTrue(log.hasOngoingTransaction(producerId)) // VerificationGuard should be cleared now. - assertNull(log.verificationGuard(producerId)) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId)) // A subsequent maybeStartTransactionVerification will be empty since we are already verified. - assertNull(log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.maybeStartTransactionVerification(producerId, sequence, producerEpoch)) val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker( producerId, @@ -3788,15 +3790,16 @@ class UnifiedLogTest { log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0) assertFalse(log.hasOngoingTransaction(producerId)) - assertNull(log.verificationGuard(producerId)) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId)) if (appendOrigin == AppendOrigin.CLIENT) sequence = sequence + 1 // A new maybeStartTransactionVerification will not be empty, as we need to verify the next transaction. val newVerificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch) - assertNotNull(newVerificationGuard) + assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, newVerificationGuard) assertNotEquals(verificationGuard, newVerificationGuard) + assertFalse(verificationGuard.verifiedBy(newVerificationGuard)) } @Test @@ -3809,7 +3812,7 @@ class UnifiedLogTest { val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig) val verificationGuard = log.maybeStartTransactionVerification(producerId, 0, producerEpoch) - assertNotNull(verificationGuard) + assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard) val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker( producerId, @@ -3819,7 +3822,7 @@ class UnifiedLogTest { log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0) assertFalse(log.hasOngoingTransaction(producerId)) - assertNull(log.verificationGuard(producerId)) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId)) } @Test @@ -3832,7 +3835,7 @@ class UnifiedLogTest { val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig) val verificationGuard = log.maybeStartTransactionVerification(producerId, 0, producerEpoch) - assertNotNull(verificationGuard) + assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard) producerStateManagerConfig.setTransactionVerificationEnabled(false) @@ -3847,7 +3850,7 @@ class UnifiedLogTest { log.appendAsLeader(transactionalRecords, leaderEpoch = 0) assertTrue(log.hasOngoingTransaction(producerId)) - assertNull(log.verificationGuard(producerId)) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId)) } @Test @@ -3872,14 +3875,14 @@ class UnifiedLogTest { ) assertThrows(classOf[InvalidTxnStateException], () => log.appendAsLeader(transactionalRecords, leaderEpoch = 0)) assertFalse(log.hasOngoingTransaction(producerId)) - assertNull(log.verificationGuard(producerId)) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId)) val verificationGuard = log.maybeStartTransactionVerification(producerId, sequence, producerEpoch) - assertNotNull(verificationGuard) + assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, verificationGuard) log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard) assertTrue(log.hasOngoingTransaction(producerId)) - assertNull(log.verificationGuard(producerId)) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId)) } @Test @@ -3892,7 +3895,7 @@ class UnifiedLogTest { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5) val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig) assertFalse(log.hasOngoingTransaction(producerId)) - assertNull(log.verificationGuard(producerId)) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, log.verificationGuard(producerId)) val transactionalRecords = MemoryRecords.withTransactionalRecords( CompressionType.NONE, diff --git a/core/src/test/scala/unit/kafka/log/VerificationGuardTest.scala b/core/src/test/scala/unit/kafka/log/VerificationGuardTest.scala new file mode 100644 index 00000000000..07623780c93 --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/VerificationGuardTest.scala @@ -0,0 +1,36 @@ +package unit.kafka.log + +import org.apache.kafka.storage.internals.log.VerificationGuard +import org.apache.kafka.storage.internals.log.VerificationGuard.SENTINEL_VERIFICATION_GUARD +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertTrue} +import org.junit.jupiter.api.Test + +class VerificationGuardTest { + + @Test + def testEqualsAndHashCode(): Unit = { + val verificationGuard1 = new VerificationGuard + val verificationGuard2 = new VerificationGuard + + assertNotEquals(verificationGuard1, verificationGuard2) + assertNotEquals(SENTINEL_VERIFICATION_GUARD, verificationGuard1) + assertEquals(SENTINEL_VERIFICATION_GUARD, SENTINEL_VERIFICATION_GUARD) + + assertNotEquals(verificationGuard1.hashCode, verificationGuard2.hashCode) + assertNotEquals(SENTINEL_VERIFICATION_GUARD.hashCode, verificationGuard1.hashCode) + assertEquals(SENTINEL_VERIFICATION_GUARD.hashCode, SENTINEL_VERIFICATION_GUARD.hashCode) + } + + @Test + def testVerifiedBy(): Unit = { + val verificationGuard1 = new VerificationGuard + val verificationGuard2 = new VerificationGuard + + assertFalse(verificationGuard1.verifiedBy(verificationGuard2)) + assertFalse(verificationGuard1.verifiedBy(SENTINEL_VERIFICATION_GUARD)) + assertFalse(SENTINEL_VERIFICATION_GUARD.verifiedBy(verificationGuard1)) + assertFalse(SENTINEL_VERIFICATION_GUARD.verifiedBy(SENTINEL_VERIFICATION_GUARD)) + assertTrue(verificationGuard1.verifiedBy(verificationGuard1)) + } + +} diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 50e43d799bd..91874d8ab2c 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -60,7 +60,7 @@ import org.apache.kafka.server.common.OffsetAndEpoch import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.util.{MockScheduler, MockTime} -import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo} +import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchIsolation, FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, VerificationGuard} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest @@ -2163,7 +2163,7 @@ class ReplicaManagerTest { new SimpleRecord("message".getBytes)) appendRecords(replicaManager, tp0, idempotentRecords) verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any[AddPartitionsToTxnManager.AppendCallback]()) - assertNull(getVerificationGuard(replicaManager, tp0, producerId)) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp0, producerId)) // If we supply a transactional ID and some transactional and some idempotent records, we should only verify the topic partition with transactional records. val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 1, @@ -2179,8 +2179,8 @@ class ReplicaManagerTest { ArgumentMatchers.eq(Seq(tp0)), any[AddPartitionsToTxnManager.AppendCallback]() ) - assertNotNull(getVerificationGuard(replicaManager, tp0, producerId)) - assertNull(getVerificationGuard(replicaManager, tp1, producerId)) + assertNotEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp0, producerId)) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp1, producerId)) } finally { replicaManager.shutdown(checkpointHW = false) } @@ -2238,7 +2238,7 @@ class ReplicaManagerTest { val callback2: AddPartitionsToTxnManager.AppendCallback = appendCallback2.getValue() callback2(Map.empty[TopicPartition, Errors].toMap) - assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId)) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp0, producerId)) assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId)) } finally { replicaManager.shutdown(checkpointHW = false) @@ -2424,7 +2424,7 @@ class ReplicaManagerTest { val transactionalRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, new SimpleRecord(s"message $sequence".getBytes)) appendRecords(replicaManager, tp, transactionalRecords, transactionalId = transactionalId) - assertNull(getVerificationGuard(replicaManager, tp, producerId)) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp, producerId)) // We should not add these partitions to the manager to verify. verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any()) @@ -2442,7 +2442,7 @@ class ReplicaManagerTest { appendRecords(replicaManager, tp, moreTransactionalRecords, transactionalId = transactionalId) verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any()) - assertEquals(null, getVerificationGuard(replicaManager, tp, producerId)) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp, producerId)) assertTrue(replicaManager.localLog(tp).get.hasOngoingTransaction(producerId)) } finally { replicaManager.shutdown(checkpointHW = false) @@ -2496,7 +2496,7 @@ class ReplicaManagerTest { // This time we do not verify appendRecords(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId) verify(addPartitionsToTxnManager, times(1)).verifyTransaction(any(), any(), any(), any(), any()) - assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId)) + assertEquals(VerificationGuard.SENTINEL_VERIFICATION_GUARD, getVerificationGuard(replicaManager, tp0, producerId)) assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId)) } finally { replicaManager.shutdown(checkpointHW = false) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java index e079707c7ab..0235769ee1a 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationGuard.java @@ -19,6 +19,10 @@ package org.apache.kafka.storage.internals.log; import java.util.concurrent.atomic.AtomicLong; public class VerificationGuard { + + // The sentinel verification guard will be used as a default when no verification guard is provided. + // It can not be used to verify a transaction is ongoing and its verificationGuardValue is always 0. + public static final VerificationGuard SENTINEL_VERIFICATION_GUARD = new VerificationGuard(0); private static final AtomicLong INCREMENTING_ID = new AtomicLong(0L); private final long verificationGuardValue; @@ -26,6 +30,10 @@ public class VerificationGuard { verificationGuardValue = INCREMENTING_ID.incrementAndGet(); } + private VerificationGuard(long value) { + verificationGuardValue = value; + } + @Override public String toString() { return "VerificationGuard: " + verificationGuardValue; @@ -45,7 +53,11 @@ public class VerificationGuard { return (int) (value ^ (value >>> 32)); } - public long verificationGuardValue() { + private long verificationGuardValue() { return verificationGuardValue; } + + public boolean verifiedBy(VerificationGuard verifyingGuard) { + return verifyingGuard != SENTINEL_VERIFICATION_GUARD && verifyingGuard.equals(this); + } }