This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b892acae5e0 KAFKA-15424: Make the transaction verification a dynamic 
configuration (#14324)
b892acae5e0 is described below

commit b892acae5e026e1affd51ef9756772807674b964
Author: Justine Olshan <[email protected]>
AuthorDate: Mon Sep 4 20:40:50 2023 -0700

    KAFKA-15424: Make the transaction verification a dynamic configuration 
(#14324)
    
    This will allow enabling and disabling transaction verification (KIP-890 
part 1) without having to roll the cluster.
    
    Tested that restarting the cluster persists the configuration.
    
    If a verification is disabled/enabled while we have an inflight request, 
depending on the step of the process, the change may or may not be seen in the 
inflight request (enabling will typically fail unverified requests, but we may 
still verify and reject when we first disable) Subsequent requests/retries will 
behave as expected for verification.
    
    Sequence checks will continue to take place after disabling until the first 
message is written to the partition (thus clearing the verification entry with 
the tentative sequence) or the broker restarts/partition is reassigned which 
will clear the memory. On enabling, we will only track sequences that for 
requests received after the verification is enabled.
    
    Reviewers: Jason Gustafson <[email protected]>, Satish Duggana 
<[email protected]>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  8 ++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  2 +-
 .../server/DynamicBrokerReconfigurationTest.scala  | 32 ++++++++++
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 11 +++-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 60 ++++++++++++++++++
 .../unit/kafka/server/ReplicaManagerTest.scala     | 74 ++++++++++++++++++++++
 .../internals/log/ProducerStateManagerConfig.java  | 12 +++-
 7 files changed, 191 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 57aaf396e85..610634f8598 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -1111,10 +1111,14 @@ class DynamicListenerConfig(server: KafkaBroker) 
extends BrokerReconfigurable wi
 
 class DynamicProducerStateManagerConfig(val producerStateManagerConfig: 
ProducerStateManagerConfig) extends BrokerReconfigurable with Logging {
   def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
-    if (producerStateManagerConfig.producerIdExpirationMs() != 
newConfig.producerIdExpirationMs) {
-      info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from 
${producerStateManagerConfig.producerIdExpirationMs()} to 
${newConfig.producerIdExpirationMs}")
+    if (producerStateManagerConfig.producerIdExpirationMs != 
newConfig.producerIdExpirationMs) {
+      info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from 
${producerStateManagerConfig.producerIdExpirationMs} to 
${newConfig.producerIdExpirationMs}")
       
producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs)
     }
+    if (producerStateManagerConfig.transactionVerificationEnabled != 
newConfig.transactionPartitionVerificationEnable) {
+      info(s"Reconfigure 
${KafkaConfig.TransactionPartitionVerificationEnableProp} from 
${producerStateManagerConfig.transactionVerificationEnabled} to 
${newConfig.transactionPartitionVerificationEnable}")
+      
producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionPartitionVerificationEnable)
+    }
   }
 
   def validateReconfiguration(newConfig: KafkaConfig): Unit = {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a7cfc06f5b5..d690ac68fde 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -2011,7 +2011,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val transactionAbortTimedOutTransactionCleanupIntervalMs = 
getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp)
   val transactionRemoveExpiredTransactionalIdCleanupIntervalMs = 
getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp)
 
-  val transactionPartitionVerificationEnable = 
getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp)
+  def transactionPartitionVerificationEnable = 
getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp)
 
   def producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)
   val producerIdExpirationCheckIntervalMs = 
getInt(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp)
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index c92287184a8..3bbfbbed78a 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1255,6 +1255,38 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     }
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTransactionVerificationEnable(quorum: String): Unit = {
+    def verifyConfiguration(enabled: Boolean): Unit = {
+      servers.foreach { server =>
+        TestUtils.waitUntilTrue(() => 
server.logManager.producerStateManagerConfig.transactionVerificationEnabled == 
enabled, "Configuration was not updated.")
+      }
+      verifyThreads("AddPartitionsToTxnSenderThread-", 1)
+    }
+    // Verification enabled by default
+    verifyConfiguration(true)
+
+    // Dynamically turn verification off.
+    val configPrefix = listenerPrefix(SecureExternal)
+    val updatedProps = securityProps(sslProperties1, KEYSTORE_PROPS, 
configPrefix)
+    updatedProps.put(KafkaConfig.TransactionPartitionVerificationEnableProp, 
"false")
+    alterConfigsUsingConfigCommand(updatedProps)
+    verifyConfiguration(false)
+
+    // Ensure it remains off after shutdown.
+    val shutdownServer = servers.head
+    shutdownServer.shutdown()
+    shutdownServer.awaitShutdown()
+    shutdownServer.startup()
+    verifyConfiguration(false)
+
+    // Turn verification back on.
+    updatedProps.put(KafkaConfig.TransactionPartitionVerificationEnableProp, 
"true")
+    alterConfigsUsingConfigCommand(updatedProps)
+    verifyConfiguration(true)
+  }
+
   private def verifyAddListener(listenerName: String, securityProtocol: 
SecurityProtocol,
                                 saslMechanisms: Seq[String]): Unit = {
     addListener(servers, listenerName, securityProtocol, saslMechanisms)
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 64a8a5f72c7..f6e29a73428 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -32,6 +32,8 @@ import org.apache.kafka.common.utils.{MockTime, Utils}
 import org.apache.kafka.storage.internals.log.{AppendOrigin, CompletedTxn, 
LogFileUtils, LogOffsetMetadata, ProducerAppendInfo, ProducerStateEntry, 
ProducerStateManager, ProducerStateManagerConfig, TxnMetadata, 
VerificationStateEntry}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 import org.mockito.Mockito.{mock, when}
 
 import java.util
@@ -1137,10 +1139,15 @@ class ProducerStateManagerTest {
     verifyEntry(producerId, updatedEntryOldEpoch, 2, 1)
   }
 
-  @Test
-  def testThrowOutOfOrderSequenceWithVerificationSequenceCheck(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testThrowOutOfOrderSequenceWithVerificationSequenceCheck(dynamicallyDisable: 
Boolean): Unit = {
     val originalEntry = 
stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
 
+    // Even if we dynamically disable, we should still execute the sequence 
check if we have an entry
+    if (dynamicallyDisable)
+      producerStateManagerConfig.setTransactionVerificationEnabled(false)
+
     // Trying to append with a higher sequence should fail
     assertThrows(classOf[OutOfOrderSequenceException], () => 
append(stateManager, producerId, 0, 4, offset = 0, isTransactional = true))
 
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 0104c55e4f2..c6806479a18 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -3770,6 +3770,66 @@ class UnifiedLogTest {
     assertNull(log.verificationGuard(producerId))
   }
 
+  @Test
+  def testDisabledVerificationClearsVerificationGuard(): Unit = {
+    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
true)
+
+    val producerId = 23L
+    val producerEpoch = 1.toShort
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig)
+
+    val verificationGuard = log.maybeStartTransactionVerification(producerId, 
0, producerEpoch)
+    assertNotNull(verificationGuard)
+
+    producerStateManagerConfig.setTransactionVerificationEnabled(false)
+
+    val transactionalRecords = MemoryRecords.withTransactionalRecords(
+      CompressionType.NONE,
+      producerId,
+      producerEpoch,
+      0,
+      new SimpleRecord("1".getBytes),
+      new SimpleRecord("2".getBytes)
+    )
+    log.appendAsLeader(transactionalRecords, leaderEpoch = 0)
+
+    assertTrue(log.hasOngoingTransaction(producerId))
+    assertNull(log.verificationGuard(producerId))
+  }
+
+  @Test
+  def testEnablingVerificationWhenRequestIsAtLogLayer(): Unit = {
+    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
false)
+
+    val producerId = 23L
+    val producerEpoch = 1.toShort
+    val sequence = 4
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig)
+
+    producerStateManagerConfig.setTransactionVerificationEnabled(true)
+
+    val transactionalRecords = MemoryRecords.withTransactionalRecords(
+      CompressionType.NONE,
+      producerId,
+      producerEpoch,
+      sequence,
+      new SimpleRecord("1".getBytes),
+      new SimpleRecord("2".getBytes)
+    )
+    assertThrows(classOf[InvalidTxnStateException], () => 
log.appendAsLeader(transactionalRecords, leaderEpoch = 0))
+    assertFalse(log.hasOngoingTransaction(producerId))
+    assertNull(log.verificationGuard(producerId))
+
+    val verificationGuard = log.maybeStartTransactionVerification(producerId, 
sequence, producerEpoch)
+    assertNotNull(verificationGuard)
+
+    log.appendAsLeader(transactionalRecords, leaderEpoch = 0, 
verificationGuard = verificationGuard)
+    assertTrue(log.hasOngoingTransaction(producerId))
+    assertNull(log.verificationGuard(producerId))
+  }
+
   @Test
   def testAllowNonZeroSequenceOnFirstAppendNonZeroEpoch(): Unit = {
     val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
true)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index d9cd8cb0ab7..783c10b3b44 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2443,6 +2443,80 @@ class ReplicaManagerTest {
 
       // We should not add these partitions to the manager to verify.
       verify(addPartitionsToTxnManager, times(0)).addTxnData(any(), any(), 
any())
+
+      // Dynamically enable verification.
+      config.dynamicConfig.initialize(None)
+      val props = new Properties()
+      props.put(KafkaConfig.TransactionPartitionVerificationEnableProp, "true")
+      config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
+      TestUtils.waitUntilTrue(() => 
config.transactionPartitionVerificationEnable == true, "Config did not 
dynamically update.")
+
+      // Try to append more records. We don't need to send a request since the 
transaction is already ongoing.
+      val moreTransactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence + 1,
+        new SimpleRecord("message".getBytes))
+
+      appendRecords(replicaManager, tp, moreTransactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+      verify(addPartitionsToTxnManager, times(0)).addTxnData(any(), any(), 
any())
+      assertEquals(null, getVerificationGuard(replicaManager, tp, producerId))
+      
assertTrue(replicaManager.localLog(tp).get.hasOngoingTransaction(producerId))
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @Test
+  def testTransactionVerificationDynamicDisablement(): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val producerId = 24L
+    val producerEpoch = 0.toShort
+    val sequence = 6
+    val node = new Node(0, "host1", 0)
+    val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+    val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), node)
+    try {
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
+
+      // Append some transactional records.
+      val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+
+      val transactionToAdd = new AddPartitionsToTxnTransaction()
+        .setTransactionalId(transactionalId)
+        .setProducerId(producerId)
+        .setProducerEpoch(producerEpoch)
+        .setVerifyOnly(true)
+        .setTopics(new AddPartitionsToTxnTopicCollection(
+          Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+        ))
+
+      // We should add these partitions to the manager to verify.
+      val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+      val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+      verify(addPartitionsToTxnManager, 
times(1)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+      val verificationGuard = getVerificationGuard(replicaManager, tp0, 
producerId)
+      assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
+
+      // Disable verification
+      config.dynamicConfig.initialize(None)
+      val props = new Properties()
+      props.put(KafkaConfig.TransactionPartitionVerificationEnableProp, 
"false")
+      config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
+      TestUtils.waitUntilTrue(() => 
config.transactionPartitionVerificationEnable == false, "Config did not 
dynamically update.")
+
+      // Confirm we did not write to the log and instead returned error.
+      val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+      callback(Map(tp0 -> Errors.INVALID_TXN_STATE).toMap)
+      assertEquals(Errors.INVALID_TXN_STATE, result.assertFired.error)
+      assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
+
+      // This time we do not verify
+      appendRecords(replicaManager, tp0, transactionalRecords, transactionalId 
= transactionalId, transactionStatePartition = Some(0))
+      verify(addPartitionsToTxnManager, times(1)).addTxnData(any(), any(), 
any())
+      assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+      
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId))
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java
index 53e4c010d9d..2762d29b1a8 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java
@@ -16,15 +16,17 @@
  */
 package org.apache.kafka.storage.internals.log;
 
-import java.util.Collections;
+import org.apache.kafka.common.utils.Utils;
+
 import java.util.Set;
 
 public class ProducerStateManagerConfig {
     public static final String PRODUCER_ID_EXPIRATION_MS = 
"producer.id.expiration.ms";
-    public static final Set<String> RECONFIGURABLE_CONFIGS = 
Collections.singleton(PRODUCER_ID_EXPIRATION_MS);
+    public static final String TRANSACTION_VERIFICATION_ENABLED = 
"transaction.partition.verification.enable";
+    public static final Set<String> RECONFIGURABLE_CONFIGS = 
Utils.mkSet(PRODUCER_ID_EXPIRATION_MS, TRANSACTION_VERIFICATION_ENABLED);
 
     private volatile int producerIdExpirationMs;
-    private boolean transactionVerificationEnabled;
+    private volatile boolean transactionVerificationEnabled;
 
     public ProducerStateManagerConfig(int producerIdExpirationMs, boolean 
transactionVerificationEnabled) {
         this.producerIdExpirationMs = producerIdExpirationMs;
@@ -35,6 +37,10 @@ public class ProducerStateManagerConfig {
         this.producerIdExpirationMs = producerIdExpirationMs;
     }
 
+    public void setTransactionVerificationEnabled(boolean 
transactionVerificationEnabled) {
+        this.transactionVerificationEnabled = transactionVerificationEnabled;
+    }
+
     public int producerIdExpirationMs() {
         return producerIdExpirationMs;
     }

Reply via email to