junrao commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1163287233
########## core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala: ########## @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.server + +import kafka.common.RequestAndCompletionHandler +import kafka.server.{AddPartitionsToTxnManager, KafkaConfig} +import kafka.utils.TestUtils +import org.apache.kafka.clients.{ClientResponse, NetworkClient} +import org.apache.kafka.common.errors.{AuthenticationException, SaslAuthenticationException, UnsupportedVersionException} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.message.AddPartitionsToTxnResponseData +import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResultCollection +import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AbstractResponse, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.MockTime +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.mockito.Mockito.mock + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +class AddPartitionsToTxnManagerTest { + private val networkClient: NetworkClient = mock(classOf[NetworkClient]) + + private val time = new MockTime + + private var addPartitionsToTxnManager: AddPartitionsToTxnManager = _ + + val topic = "foo" + val topicPartitions = List(new TopicPartition(topic, 1), new TopicPartition(topic, 2), new TopicPartition(topic, 3)) + + private val node0 = new Node(0, "host1", 0) + private val node1 = new Node(1, "host2", 1) + private val node2 = new Node(2, "host2", 2) + + private val transactionalId1 = "txn1" + private val transactionalId2 = "txn2" + private val transactionalId3 = "txn3" + + private val producerId1 = 0L + private val producerId2 = 1L + private val producerId3 = 2L + + private val authenticationErrorResponse = clientResponse(null, authException = new SaslAuthenticationException("")) + private val versionMismatchResponse = clientResponse(null, mismatchException = new UnsupportedVersionException("")) + + @BeforeEach + def setup(): Unit = { + addPartitionsToTxnManager = new AddPartitionsToTxnManager( + KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")), + networkClient, + time) + } + + @AfterEach + def teardown(): Unit = { + addPartitionsToTxnManager.shutdown() + } + + def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = { Review Comment: Could this be private? Ditto for a few other helper methods in this file. ########## core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala: ########## @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.server + +import kafka.common.RequestAndCompletionHandler +import kafka.server.{AddPartitionsToTxnManager, KafkaConfig} +import kafka.utils.TestUtils +import org.apache.kafka.clients.{ClientResponse, NetworkClient} +import org.apache.kafka.common.errors.{AuthenticationException, SaslAuthenticationException, UnsupportedVersionException} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.message.AddPartitionsToTxnResponseData +import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResultCollection +import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AbstractResponse, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.MockTime +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.mockito.Mockito.mock + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +class AddPartitionsToTxnManagerTest { + private val networkClient: NetworkClient = mock(classOf[NetworkClient]) + + private val time = new MockTime + + private var addPartitionsToTxnManager: AddPartitionsToTxnManager = _ + + val topic = "foo" + val topicPartitions = List(new TopicPartition(topic, 1), new TopicPartition(topic, 2), new TopicPartition(topic, 3)) + + private val node0 = new Node(0, "host1", 0) + private val node1 = new Node(1, "host2", 1) + private val node2 = new Node(2, "host2", 2) + + private val transactionalId1 = "txn1" + private val transactionalId2 = "txn2" + private val transactionalId3 = "txn3" + + private val producerId1 = 0L + private val producerId2 = 1L + private val producerId3 = 2L + + private val authenticationErrorResponse = clientResponse(null, authException = new SaslAuthenticationException("")) + private val versionMismatchResponse = clientResponse(null, mismatchException = new UnsupportedVersionException("")) + + @BeforeEach + def setup(): Unit = { + addPartitionsToTxnManager = new AddPartitionsToTxnManager( + KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")), + networkClient, + time) + } + + @AfterEach + def teardown(): Unit = { + addPartitionsToTxnManager.shutdown() + } + + def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = { + callbackErrors.foreach { + case (tp, error) => errors.put(tp, error) + } + } + + @Test + def testAddTxnData(): Unit = { + val transaction1Errors = mutable.Map[TopicPartition, Errors]() + val transaction2Errors = mutable.Map[TopicPartition, Errors]() + val transaction3Errors = mutable.Map[TopicPartition, Errors]() + + addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1), setErrors(transaction1Errors)) + addPartitionsToTxnManager.addTxnData(node1, transactionData(transactionalId2, producerId2), setErrors(transaction2Errors)) + addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId3, producerId3), setErrors(transaction3Errors)) + + val transaction1AgainErrorsOldEpoch = mutable.Map[TopicPartition, Errors]() + val transaction1AgainErrorsNewEpoch = mutable.Map[TopicPartition, Errors]() + // Trying to add more transactional data for the same transactional ID, producer ID, and epoch should simply replace the old data. The error map should remain empty. Review Comment: "The error map should remain empty." What does this mean? ########## core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala: ########## @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.server + +import kafka.common.RequestAndCompletionHandler +import kafka.server.{AddPartitionsToTxnManager, KafkaConfig} +import kafka.utils.TestUtils +import org.apache.kafka.clients.{ClientResponse, NetworkClient} +import org.apache.kafka.common.errors.{AuthenticationException, SaslAuthenticationException, UnsupportedVersionException} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.message.AddPartitionsToTxnResponseData +import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResultCollection +import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AbstractResponse, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.MockTime +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.mockito.Mockito.mock + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +class AddPartitionsToTxnManagerTest { + private val networkClient: NetworkClient = mock(classOf[NetworkClient]) + + private val time = new MockTime + + private var addPartitionsToTxnManager: AddPartitionsToTxnManager = _ + + val topic = "foo" + val topicPartitions = List(new TopicPartition(topic, 1), new TopicPartition(topic, 2), new TopicPartition(topic, 3)) + + private val node0 = new Node(0, "host1", 0) + private val node1 = new Node(1, "host2", 1) + private val node2 = new Node(2, "host2", 2) + + private val transactionalId1 = "txn1" + private val transactionalId2 = "txn2" + private val transactionalId3 = "txn3" + + private val producerId1 = 0L + private val producerId2 = 1L + private val producerId3 = 2L + + private val authenticationErrorResponse = clientResponse(null, authException = new SaslAuthenticationException("")) + private val versionMismatchResponse = clientResponse(null, mismatchException = new UnsupportedVersionException("")) + + @BeforeEach + def setup(): Unit = { + addPartitionsToTxnManager = new AddPartitionsToTxnManager( + KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")), + networkClient, + time) + } + + @AfterEach + def teardown(): Unit = { + addPartitionsToTxnManager.shutdown() + } + + def setErrors(errors: mutable.Map[TopicPartition, Errors])(callbackErrors: Map[TopicPartition, Errors]): Unit = { + callbackErrors.foreach { + case (tp, error) => errors.put(tp, error) + } + } + + @Test + def testAddTxnData(): Unit = { + val transaction1Errors = mutable.Map[TopicPartition, Errors]() + val transaction2Errors = mutable.Map[TopicPartition, Errors]() + val transaction3Errors = mutable.Map[TopicPartition, Errors]() + + addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1), setErrors(transaction1Errors)) + addPartitionsToTxnManager.addTxnData(node1, transactionData(transactionalId2, producerId2), setErrors(transaction2Errors)) + addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId3, producerId3), setErrors(transaction3Errors)) + + val transaction1AgainErrorsOldEpoch = mutable.Map[TopicPartition, Errors]() + val transaction1AgainErrorsNewEpoch = mutable.Map[TopicPartition, Errors]() + // Trying to add more transactional data for the same transactional ID, producer ID, and epoch should simply replace the old data. The error map should remain empty. + addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1), setErrors(transaction1AgainErrorsOldEpoch)) + val expectedNetworkErrors = topicPartitions.map(_ -> Errors.NETWORK_EXCEPTION).toMap + assertEquals(expectedNetworkErrors, transaction1Errors) + + // Trying to add more transactional data for the same transactional ID and producer ID, but new epoch should replace the old data and send an error response for it. + addPartitionsToTxnManager.addTxnData(node0, transactionData(transactionalId1, producerId1, producerEpoch = 1), setErrors(transaction1AgainErrorsNewEpoch)) + Review Comment: Get rid of the new line since the following validation is related to this action? ########## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ########## @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} +import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.Time + +import scala.collection.mutable + +object AddPartitionsToTxnManager { + type AppendCallback = Map[TopicPartition, Errors] => Unit +} + + +class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + + +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) + extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { + + private val inflightNodes = mutable.HashSet[Node]() + private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { + nodesToTransactions.synchronized { Review Comment: Could we file a jira to track this in a followup? ########## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ########## @@ -69,20 +108,52 @@ class KafkaRequestHandler(id: Int, completeShutdown() return + case callback: RequestChannel.CallbackRequest => + try { + val originalRequest = callback.originalRequest + + // If we've already executed a callback for this request, reset the times and subtract the callback time from the + // new dequeue time. This will allow calculation of multiple callback times. + // Otherwise, set dequeue time to now. + if (originalRequest.callbackRequestDequeueTimeNanos.isDefined) { + val prevCallbacksTimeNanos = originalRequest.callbackRequestCompleteTimeNanos.getOrElse(0L) - originalRequest.callbackRequestDequeueTimeNanos.getOrElse(0L) + originalRequest.callbackRequestCompleteTimeNanos = None + originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds() - prevCallbacksTimeNanos) + } else { + originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds()) + } + + currentRequest.set(originalRequest) + callback.fun() + if (originalRequest.callbackRequestCompleteTimeNanos.isEmpty) Review Comment: Hmm, not sure that I follow. In the false case, it seems that we just don't set `callbackRequestCompleteTimeNanos`? This is a bit weird since the callback is completed. ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -2053,6 +2056,190 @@ class ReplicaManagerTest { assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, produceResult.get.error) } + @Test + def testVerificationForTransactionalPartitions(): Unit = { + val tp = new TopicPartition(topic, 0) + val transactionalId = "txn1" + val producerId = 24L + val producerEpoch = 0.toShort + val sequence = 0 + + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) + val metadataCache = mock(classOf[MetadataCache]) + val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + + val replicaManager = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager, + addPartitionsToTxnManager = Some(addPartitionsToTxnManager)) + + try { + val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), tp, Seq(0, 1), LeaderAndIsr(1, List(0, 1))) + replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) + + // We must set up the metadata cache to handle the append and verification. + val metadataResponseTopic = Seq(new MetadataResponseTopic() + .setName(Topic.TRANSACTION_STATE_TOPIC_NAME) + .setPartitions(Seq( + new MetadataResponsePartition() + .setPartitionIndex(0) + .setLeaderId(0)).asJava)) + val node = new Node(0, "host1", 0) + + when(metadataCache.contains(tp)).thenReturn(true) + when(metadataCache.getTopicMetadata(Set(Topic.TRANSACTION_STATE_TOPIC_NAME), config.interBrokerListenerName)).thenReturn(metadataResponseTopic) + when(metadataCache.getAliveBrokerNode(0, config.interBrokerListenerName)).thenReturn(Some(node)) + when(metadataCache.getAliveBrokerNode(1, config.interBrokerListenerName)).thenReturn(None) + + // We will attempt to schedule to the request handler thread using a non request handler thread. Set this to avoid error. + KafkaRequestHandler.setBypassThreadCheck(true) Review Comment: Do we need this? It doesn't seem the request handler thread is involved in the test. ########## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ########## @@ -1184,11 +1184,25 @@ class PartitionTest extends AbstractPartitionTest { builder.build() } - def createTransactionalRecords(records: Iterable[SimpleRecord], - baseOffset: Long): MemoryRecords = { - val producerId = 1L + def createIdempotentRecords(records: Iterable[SimpleRecord], + baseOffset: Long, + baseSequence: Int = 0, + producerId: Long = 1L): MemoryRecords = { val producerEpoch = 0.toShort val baseSequence = 0 + val isTransactional = false + val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) + val builder = MemoryRecords.builder(buf, CompressionType.NONE, baseOffset, producerId, + producerEpoch, baseSequence, isTransactional) + records.foreach(builder.append) + builder.build() + } + + def createTransactionalRecords(records: Iterable[SimpleRecord], + baseOffset: Long, + baseSequence: Int = 0, Review Comment: baseSequence is redefined as val. So, this is unused. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org