This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b892acae5e0 KAFKA-15424: Make the transaction verification a dynamic
configuration (#14324)
b892acae5e0 is described below
commit b892acae5e026e1affd51ef9756772807674b964
Author: Justine Olshan <[email protected]>
AuthorDate: Mon Sep 4 20:40:50 2023 -0700
KAFKA-15424: Make the transaction verification a dynamic configuration
(#14324)
This will allow enabling and disabling transaction verification (KIP-890
part 1) without having to roll the cluster.
Tested that restarting the cluster persists the configuration.
If a verification is disabled/enabled while we have an inflight request,
depending on the step of the process, the change may or may not be seen in the
inflight request (enabling will typically fail unverified requests, but we may
still verify and reject when we first disable) Subsequent requests/retries will
behave as expected for verification.
Sequence checks will continue to take place after disabling until the first
message is written to the partition (thus clearing the verification entry with
the tentative sequence) or the broker restarts/partition is reassigned which
will clear the memory. On enabling, we will only track sequences that for
requests received after the verification is enabled.
Reviewers: Jason Gustafson <[email protected]>, Satish Duggana
<[email protected]>
---
.../scala/kafka/server/DynamicBrokerConfig.scala | 8 ++-
core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +-
.../server/DynamicBrokerReconfigurationTest.scala | 32 ++++++++++
.../unit/kafka/log/ProducerStateManagerTest.scala | 11 +++-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 60 ++++++++++++++++++
.../unit/kafka/server/ReplicaManagerTest.scala | 74 ++++++++++++++++++++++
.../internals/log/ProducerStateManagerConfig.java | 12 +++-
7 files changed, 191 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 57aaf396e85..610634f8598 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -1111,10 +1111,14 @@ class DynamicListenerConfig(server: KafkaBroker)
extends BrokerReconfigurable wi
class DynamicProducerStateManagerConfig(val producerStateManagerConfig:
ProducerStateManagerConfig) extends BrokerReconfigurable with Logging {
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
- if (producerStateManagerConfig.producerIdExpirationMs() !=
newConfig.producerIdExpirationMs) {
- info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from
${producerStateManagerConfig.producerIdExpirationMs()} to
${newConfig.producerIdExpirationMs}")
+ if (producerStateManagerConfig.producerIdExpirationMs !=
newConfig.producerIdExpirationMs) {
+ info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from
${producerStateManagerConfig.producerIdExpirationMs} to
${newConfig.producerIdExpirationMs}")
producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs)
}
+ if (producerStateManagerConfig.transactionVerificationEnabled !=
newConfig.transactionPartitionVerificationEnable) {
+ info(s"Reconfigure
${KafkaConfig.TransactionPartitionVerificationEnableProp} from
${producerStateManagerConfig.transactionVerificationEnabled} to
${newConfig.transactionPartitionVerificationEnable}")
+
producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionPartitionVerificationEnable)
+ }
}
def validateReconfiguration(newConfig: KafkaConfig): Unit = {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a7cfc06f5b5..d690ac68fde 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -2011,7 +2011,7 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
val transactionAbortTimedOutTransactionCleanupIntervalMs =
getInt(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp)
val transactionRemoveExpiredTransactionalIdCleanupIntervalMs =
getInt(KafkaConfig.TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp)
- val transactionPartitionVerificationEnable =
getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp)
+ def transactionPartitionVerificationEnable =
getBoolean(KafkaConfig.TransactionPartitionVerificationEnableProp)
def producerIdExpirationMs = getInt(KafkaConfig.ProducerIdExpirationMsProp)
val producerIdExpirationCheckIntervalMs =
getInt(KafkaConfig.ProducerIdExpirationCheckIntervalMsProp)
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index c92287184a8..3bbfbbed78a 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1255,6 +1255,38 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
}
}
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testTransactionVerificationEnable(quorum: String): Unit = {
+ def verifyConfiguration(enabled: Boolean): Unit = {
+ servers.foreach { server =>
+ TestUtils.waitUntilTrue(() =>
server.logManager.producerStateManagerConfig.transactionVerificationEnabled ==
enabled, "Configuration was not updated.")
+ }
+ verifyThreads("AddPartitionsToTxnSenderThread-", 1)
+ }
+ // Verification enabled by default
+ verifyConfiguration(true)
+
+ // Dynamically turn verification off.
+ val configPrefix = listenerPrefix(SecureExternal)
+ val updatedProps = securityProps(sslProperties1, KEYSTORE_PROPS,
configPrefix)
+ updatedProps.put(KafkaConfig.TransactionPartitionVerificationEnableProp,
"false")
+ alterConfigsUsingConfigCommand(updatedProps)
+ verifyConfiguration(false)
+
+ // Ensure it remains off after shutdown.
+ val shutdownServer = servers.head
+ shutdownServer.shutdown()
+ shutdownServer.awaitShutdown()
+ shutdownServer.startup()
+ verifyConfiguration(false)
+
+ // Turn verification back on.
+ updatedProps.put(KafkaConfig.TransactionPartitionVerificationEnableProp,
"true")
+ alterConfigsUsingConfigCommand(updatedProps)
+ verifyConfiguration(true)
+ }
+
private def verifyAddListener(listenerName: String, securityProtocol:
SecurityProtocol,
saslMechanisms: Seq[String]): Unit = {
addListener(servers, listenerName, securityProtocol, saslMechanisms)
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 64a8a5f72c7..f6e29a73428 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -32,6 +32,8 @@ import org.apache.kafka.common.utils.{MockTime, Utils}
import org.apache.kafka.storage.internals.log.{AppendOrigin, CompletedTxn,
LogFileUtils, LogOffsetMetadata, ProducerAppendInfo, ProducerStateEntry,
ProducerStateManager, ProducerStateManagerConfig, TxnMetadata,
VerificationStateEntry}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.mockito.Mockito.{mock, when}
import java.util
@@ -1137,10 +1139,15 @@ class ProducerStateManagerTest {
verifyEntry(producerId, updatedEntryOldEpoch, 2, 1)
}
- @Test
- def testThrowOutOfOrderSequenceWithVerificationSequenceCheck(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testThrowOutOfOrderSequenceWithVerificationSequenceCheck(dynamicallyDisable:
Boolean): Unit = {
val originalEntry =
stateManager.maybeCreateVerificationStateEntry(producerId, 0, 0)
+ // Even if we dynamically disable, we should still execute the sequence
check if we have an entry
+ if (dynamicallyDisable)
+ producerStateManagerConfig.setTransactionVerificationEnabled(false)
+
// Trying to append with a higher sequence should fail
assertThrows(classOf[OutOfOrderSequenceException], () =>
append(stateManager, producerId, 0, 4, offset = 0, isTransactional = true))
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 0104c55e4f2..c6806479a18 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -3770,6 +3770,66 @@ class UnifiedLogTest {
assertNull(log.verificationGuard(producerId))
}
+ @Test
+ def testDisabledVerificationClearsVerificationGuard(): Unit = {
+ val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
true)
+
+ val producerId = 23L
+ val producerEpoch = 1.toShort
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+ val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
+
+ val verificationGuard = log.maybeStartTransactionVerification(producerId,
0, producerEpoch)
+ assertNotNull(verificationGuard)
+
+ producerStateManagerConfig.setTransactionVerificationEnabled(false)
+
+ val transactionalRecords = MemoryRecords.withTransactionalRecords(
+ CompressionType.NONE,
+ producerId,
+ producerEpoch,
+ 0,
+ new SimpleRecord("1".getBytes),
+ new SimpleRecord("2".getBytes)
+ )
+ log.appendAsLeader(transactionalRecords, leaderEpoch = 0)
+
+ assertTrue(log.hasOngoingTransaction(producerId))
+ assertNull(log.verificationGuard(producerId))
+ }
+
+ @Test
+ def testEnablingVerificationWhenRequestIsAtLogLayer(): Unit = {
+ val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
false)
+
+ val producerId = 23L
+ val producerEpoch = 1.toShort
+ val sequence = 4
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+ val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
+
+ producerStateManagerConfig.setTransactionVerificationEnabled(true)
+
+ val transactionalRecords = MemoryRecords.withTransactionalRecords(
+ CompressionType.NONE,
+ producerId,
+ producerEpoch,
+ sequence,
+ new SimpleRecord("1".getBytes),
+ new SimpleRecord("2".getBytes)
+ )
+ assertThrows(classOf[InvalidTxnStateException], () =>
log.appendAsLeader(transactionalRecords, leaderEpoch = 0))
+ assertFalse(log.hasOngoingTransaction(producerId))
+ assertNull(log.verificationGuard(producerId))
+
+ val verificationGuard = log.maybeStartTransactionVerification(producerId,
sequence, producerEpoch)
+ assertNotNull(verificationGuard)
+
+ log.appendAsLeader(transactionalRecords, leaderEpoch = 0,
verificationGuard = verificationGuard)
+ assertTrue(log.hasOngoingTransaction(producerId))
+ assertNull(log.verificationGuard(producerId))
+ }
+
@Test
def testAllowNonZeroSequenceOnFirstAppendNonZeroEpoch(): Unit = {
val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
true)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index d9cd8cb0ab7..783c10b3b44 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2443,6 +2443,80 @@ class ReplicaManagerTest {
// We should not add these partitions to the manager to verify.
verify(addPartitionsToTxnManager, times(0)).addTxnData(any(), any(),
any())
+
+ // Dynamically enable verification.
+ config.dynamicConfig.initialize(None)
+ val props = new Properties()
+ props.put(KafkaConfig.TransactionPartitionVerificationEnableProp, "true")
+ config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
+ TestUtils.waitUntilTrue(() =>
config.transactionPartitionVerificationEnable == true, "Config did not
dynamically update.")
+
+ // Try to append more records. We don't need to send a request since the
transaction is already ongoing.
+ val moreTransactionalRecords =
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId,
producerEpoch, sequence + 1,
+ new SimpleRecord("message".getBytes))
+
+ appendRecords(replicaManager, tp, moreTransactionalRecords,
transactionalId = transactionalId, transactionStatePartition = Some(0))
+ verify(addPartitionsToTxnManager, times(0)).addTxnData(any(), any(),
any())
+ assertEquals(null, getVerificationGuard(replicaManager, tp, producerId))
+
assertTrue(replicaManager.localLog(tp).get.hasOngoingTransaction(producerId))
+ } finally {
+ replicaManager.shutdown(checkpointHW = false)
+ }
+ }
+
+ @Test
+ def testTransactionVerificationDynamicDisablement(): Unit = {
+ val tp0 = new TopicPartition(topic, 0)
+ val producerId = 24L
+ val producerEpoch = 0.toShort
+ val sequence = 6
+ val node = new Node(0, "host1", 0)
+ val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+ val replicaManager =
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
List(tp0), node)
+ try {
+ replicaManager.becomeLeaderOrFollower(1,
+ makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1),
LeaderAndIsr(1, List(0, 1))),
+ (_, _) => ())
+
+ // Append some transactional records.
+ val transactionalRecords =
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId,
producerEpoch, sequence,
+ new SimpleRecord("message".getBytes))
+
+ val transactionToAdd = new AddPartitionsToTxnTransaction()
+ .setTransactionalId(transactionalId)
+ .setProducerId(producerId)
+ .setProducerEpoch(producerEpoch)
+ .setVerifyOnly(true)
+ .setTopics(new AddPartitionsToTxnTopicCollection(
+ Seq(new
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+ ))
+
+ // We should add these partitions to the manager to verify.
+ val result = appendRecords(replicaManager, tp0, transactionalRecords,
transactionalId = transactionalId, transactionStatePartition = Some(0))
+ val appendCallback =
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+ verify(addPartitionsToTxnManager,
times(1)).addTxnData(ArgumentMatchers.eq(node),
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+ val verificationGuard = getVerificationGuard(replicaManager, tp0,
producerId)
+ assertEquals(verificationGuard, getVerificationGuard(replicaManager,
tp0, producerId))
+
+ // Disable verification
+ config.dynamicConfig.initialize(None)
+ val props = new Properties()
+ props.put(KafkaConfig.TransactionPartitionVerificationEnableProp,
"false")
+ config.dynamicConfig.updateBrokerConfig(config.brokerId, props)
+ TestUtils.waitUntilTrue(() =>
config.transactionPartitionVerificationEnable == false, "Config did not
dynamically update.")
+
+ // Confirm we did not write to the log and instead returned error.
+ val callback: AddPartitionsToTxnManager.AppendCallback =
appendCallback.getValue()
+ callback(Map(tp0 -> Errors.INVALID_TXN_STATE).toMap)
+ assertEquals(Errors.INVALID_TXN_STATE, result.assertFired.error)
+ assertEquals(verificationGuard, getVerificationGuard(replicaManager,
tp0, producerId))
+
+ // This time we do not verify
+ appendRecords(replicaManager, tp0, transactionalRecords, transactionalId
= transactionalId, transactionStatePartition = Some(0))
+ verify(addPartitionsToTxnManager, times(1)).addTxnData(any(), any(),
any())
+ assertEquals(null, getVerificationGuard(replicaManager, tp0, producerId))
+
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId))
} finally {
replicaManager.shutdown(checkpointHW = false)
}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java
index 53e4c010d9d..2762d29b1a8 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManagerConfig.java
@@ -16,15 +16,17 @@
*/
package org.apache.kafka.storage.internals.log;
-import java.util.Collections;
+import org.apache.kafka.common.utils.Utils;
+
import java.util.Set;
public class ProducerStateManagerConfig {
public static final String PRODUCER_ID_EXPIRATION_MS =
"producer.id.expiration.ms";
- public static final Set<String> RECONFIGURABLE_CONFIGS =
Collections.singleton(PRODUCER_ID_EXPIRATION_MS);
+ public static final String TRANSACTION_VERIFICATION_ENABLED =
"transaction.partition.verification.enable";
+ public static final Set<String> RECONFIGURABLE_CONFIGS =
Utils.mkSet(PRODUCER_ID_EXPIRATION_MS, TRANSACTION_VERIFICATION_ENABLED);
private volatile int producerIdExpirationMs;
- private boolean transactionVerificationEnabled;
+ private volatile boolean transactionVerificationEnabled;
public ProducerStateManagerConfig(int producerIdExpirationMs, boolean
transactionVerificationEnabled) {
this.producerIdExpirationMs = producerIdExpirationMs;
@@ -35,6 +37,10 @@ public class ProducerStateManagerConfig {
this.producerIdExpirationMs = producerIdExpirationMs;
}
+ public void setTransactionVerificationEnabled(boolean
transactionVerificationEnabled) {
+ this.transactionVerificationEnabled = transactionVerificationEnabled;
+ }
+
public int producerIdExpirationMs() {
return producerIdExpirationMs;
}