dajac commented on code in PR #13502: URL: https://github.com/apache/kafka/pull/13502#discussion_r1164508565
########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -2113,6 +2113,115 @@ class KafkaApisTest { assertEquals(expectedErrors, response.errors()) } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) + def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = { + val topic = "topic" + + val transactionalId = "txnId1" + val producerId = 15L + val epoch = 0.toShort + + val tp = new TopicPartition(topic, 0) + + val addPartitionsToTxnRequest = if (version < 4) + AddPartitionsToTxnRequest.Builder.forClient( + transactionalId, + producerId, + epoch, + Collections.singletonList(tp)).build(version) + else + AddPartitionsToTxnRequest.Builder.forBroker( + new AddPartitionsToTxnTransactionCollection( Review Comment: nit: indentation seems to be off. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -2113,6 +2113,115 @@ class KafkaApisTest { assertEquals(expectedErrors, response.errors()) } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) + def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = { + val topic = "topic" + + val transactionalId = "txnId1" + val producerId = 15L + val epoch = 0.toShort + + val tp = new TopicPartition(topic, 0) + + val addPartitionsToTxnRequest = if (version < 4) + AddPartitionsToTxnRequest.Builder.forClient( + transactionalId, + producerId, + epoch, + Collections.singletonList(tp)).build(version) + else + AddPartitionsToTxnRequest.Builder.forBroker( + new AddPartitionsToTxnTransactionCollection( + List(new AddPartitionsToTxnTransaction() + .setTransactionalId(transactionalId) + .setProducerId(producerId) + .setProducerEpoch(epoch) + .setVerifyOnly(true) + .setTopics(new AddPartitionsToTxnTopicCollection( + Collections.singletonList(new AddPartitionsToTxnTopic() + .setName(tp.topic) + .setPartitions(Collections.singletonList(tp.partition)) + ).iterator())) + ).asJava.iterator())).build(version) + + val requestChannelRequest = buildRequest(addPartitionsToTxnRequest) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava) + + createKafkaApis(authorizer = Some(authorizer)).handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + val response = verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest) + val error = if (version < 4) + response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp) + else + Errors.forCode(response.data().errorCode()) + + val expectedError = if (version < 4) Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED else Errors.CLUSTER_AUTHORIZATION_FAILED + assertEquals(expectedError, error) + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) + def testAddPartitionsToTxnOperationNotAttempted(version: Short): Unit = { + val topic = "topic" + addTopicToMetadataCache(topic, numPartitions = 1) + + val transactionalId = "txnId1" + val producerId = 15L + val epoch = 0.toShort + + val tp0 = new TopicPartition(topic, 0) + val tp1 = new TopicPartition(topic, 1) + + val addPartitionsToTxnRequest = if (version < 4) + AddPartitionsToTxnRequest.Builder.forClient( + transactionalId, + producerId, + epoch, + List(tp0, tp1).asJava).build(version) + else + AddPartitionsToTxnRequest.Builder.forBroker( + new AddPartitionsToTxnTransactionCollection( + List(new AddPartitionsToTxnTransaction() + .setTransactionalId(transactionalId) + .setProducerId(producerId) + .setProducerEpoch(epoch) + .setVerifyOnly(true) + .setTopics(new AddPartitionsToTxnTopicCollection( + Collections.singletonList(new AddPartitionsToTxnTopic() + .setName(tp0.topic) + .setPartitions(List[Integer](tp0.partition, tp1.partition()).asJava) + ).iterator())) + ).asJava.iterator())).build(version) + + val requestChannelRequest = buildRequest(addPartitionsToTxnRequest) + + createKafkaApis().handleAddPartitionsToTxnRequest( + requestChannelRequest, + RequestLocal.NoCaching + ) + + val response = verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest) + + def checkErrorForTp(tp: TopicPartition): Unit = { + val error = if (version < 4) + response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp) + else + response.errors().get(transactionalId).get(tp) + + val expectedError = if (tp == tp0) Errors.OPERATION_NOT_ATTEMPTED else Errors.UNKNOWN_TOPIC_OR_PARTITION + assertEquals(expectedError, error) + } + checkErrorForTp(tp1) Review Comment: nit: I would add an empty line before this one. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -2113,6 +2113,115 @@ class KafkaApisTest { assertEquals(expectedErrors, response.errors()) } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) + def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = { + val topic = "topic" + + val transactionalId = "txnId1" + val producerId = 15L + val epoch = 0.toShort + + val tp = new TopicPartition(topic, 0) + + val addPartitionsToTxnRequest = if (version < 4) + AddPartitionsToTxnRequest.Builder.forClient( + transactionalId, + producerId, + epoch, + Collections.singletonList(tp)).build(version) + else + AddPartitionsToTxnRequest.Builder.forBroker( + new AddPartitionsToTxnTransactionCollection( + List(new AddPartitionsToTxnTransaction() + .setTransactionalId(transactionalId) + .setProducerId(producerId) + .setProducerEpoch(epoch) + .setVerifyOnly(true) + .setTopics(new AddPartitionsToTxnTopicCollection( + Collections.singletonList(new AddPartitionsToTxnTopic() + .setName(tp.topic) + .setPartitions(Collections.singletonList(tp.partition)) + ).iterator())) + ).asJava.iterator())).build(version) + + val requestChannelRequest = buildRequest(addPartitionsToTxnRequest) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava) + + createKafkaApis(authorizer = Some(authorizer)).handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + val response = verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest) + val error = if (version < 4) + response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp) + else + Errors.forCode(response.data().errorCode()) + + val expectedError = if (version < 4) Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED else Errors.CLUSTER_AUTHORIZATION_FAILED + assertEquals(expectedError, error) + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) + def testAddPartitionsToTxnOperationNotAttempted(version: Short): Unit = { + val topic = "topic" + addTopicToMetadataCache(topic, numPartitions = 1) + + val transactionalId = "txnId1" + val producerId = 15L + val epoch = 0.toShort + + val tp0 = new TopicPartition(topic, 0) + val tp1 = new TopicPartition(topic, 1) + + val addPartitionsToTxnRequest = if (version < 4) + AddPartitionsToTxnRequest.Builder.forClient( + transactionalId, + producerId, + epoch, + List(tp0, tp1).asJava).build(version) + else + AddPartitionsToTxnRequest.Builder.forBroker( + new AddPartitionsToTxnTransactionCollection( + List(new AddPartitionsToTxnTransaction() + .setTransactionalId(transactionalId) + .setProducerId(producerId) + .setProducerEpoch(epoch) + .setVerifyOnly(true) + .setTopics(new AddPartitionsToTxnTopicCollection( + Collections.singletonList(new AddPartitionsToTxnTopic() + .setName(tp0.topic) + .setPartitions(List[Integer](tp0.partition, tp1.partition()).asJava) + ).iterator())) + ).asJava.iterator())).build(version) + + val requestChannelRequest = buildRequest(addPartitionsToTxnRequest) + + createKafkaApis().handleAddPartitionsToTxnRequest( + requestChannelRequest, + RequestLocal.NoCaching + ) + + val response = verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest) + + def checkErrorForTp(tp: TopicPartition): Unit = { + val error = if (version < 4) + response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp) + else + response.errors().get(transactionalId).get(tp) + + val expectedError = if (tp == tp0) Errors.OPERATION_NOT_ATTEMPTED else Errors.UNKNOWN_TOPIC_OR_PARTITION Review Comment: nit: Would it make sense to pass the expected error as an argument? The `if (tp == tp0)` is a bit nasty here. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -2113,6 +2113,115 @@ class KafkaApisTest { assertEquals(expectedErrors, response.errors()) } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) + def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = { + val topic = "topic" + + val transactionalId = "txnId1" + val producerId = 15L + val epoch = 0.toShort + + val tp = new TopicPartition(topic, 0) + + val addPartitionsToTxnRequest = if (version < 4) + AddPartitionsToTxnRequest.Builder.forClient( + transactionalId, + producerId, + epoch, + Collections.singletonList(tp)).build(version) + else + AddPartitionsToTxnRequest.Builder.forBroker( + new AddPartitionsToTxnTransactionCollection( + List(new AddPartitionsToTxnTransaction() + .setTransactionalId(transactionalId) + .setProducerId(producerId) + .setProducerEpoch(epoch) + .setVerifyOnly(true) + .setTopics(new AddPartitionsToTxnTopicCollection( + Collections.singletonList(new AddPartitionsToTxnTopic() + .setName(tp.topic) + .setPartitions(Collections.singletonList(tp.partition)) + ).iterator())) + ).asJava.iterator())).build(version) + + val requestChannelRequest = buildRequest(addPartitionsToTxnRequest) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava) + + createKafkaApis(authorizer = Some(authorizer)).handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + val response = verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest) + val error = if (version < 4) + response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp) + else + Errors.forCode(response.data().errorCode()) Review Comment: nit: You can omit the `()` here. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -2113,6 +2113,115 @@ class KafkaApisTest { assertEquals(expectedErrors, response.errors()) } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) + def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = { + val topic = "topic" + + val transactionalId = "txnId1" + val producerId = 15L + val epoch = 0.toShort + + val tp = new TopicPartition(topic, 0) + + val addPartitionsToTxnRequest = if (version < 4) + AddPartitionsToTxnRequest.Builder.forClient( + transactionalId, + producerId, + epoch, + Collections.singletonList(tp)).build(version) + else + AddPartitionsToTxnRequest.Builder.forBroker( + new AddPartitionsToTxnTransactionCollection( + List(new AddPartitionsToTxnTransaction() + .setTransactionalId(transactionalId) + .setProducerId(producerId) + .setProducerEpoch(epoch) + .setVerifyOnly(true) + .setTopics(new AddPartitionsToTxnTopicCollection( + Collections.singletonList(new AddPartitionsToTxnTopic() + .setName(tp.topic) + .setPartitions(Collections.singletonList(tp.partition)) + ).iterator())) + ).asJava.iterator())).build(version) + + val requestChannelRequest = buildRequest(addPartitionsToTxnRequest) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava) + + createKafkaApis(authorizer = Some(authorizer)).handle( + requestChannelRequest, + RequestLocal.NoCaching + ) Review Comment: nit: indentation is off. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org