[GitHub] [kafka] showuon commented on a diff in pull request #12087: KAFKA-13851: Add integration tests for DeleteRecords API
showuon commented on code in PR #12087: URL: https://github.com/apache/kafka/pull/12087#discussion_r871398002 ## core/src/test/scala/unit/kafka/server/DeleteRecordsRequestTest.scala: ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.TestInfoUtils +import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.DeleteRecordsRequestData +import org.apache.kafka.common.message.DeleteRecordsRequestData.{DeleteRecordsPartition, DeleteRecordsTopic} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{DeleteRecordsRequest, DeleteRecordsResponse} +import org.apache.kafka.common.serialization.StringSerializer +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util.Collections +import java.util.concurrent.{Future, TimeUnit} +import scala.collection.Seq + +class DeleteRecordsRequestTest extends BaseRequestTest { + private val TIMEOUT_MS = 1000 + private val MESSAGES_PRODUCED_PER_PARTITION = 10 + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDeleteRecordsHappyCase(quorum: String): Unit = { +val (topicPartition: TopicPartition, leaderId: Int) = createTopicAndSendRecords + +// Create the DeleteRecord request requesting deletion of offset which is not present +val offsetToDelete = Math.max(MESSAGES_PRODUCED_PER_PARTITION - 8, 0) +val request: DeleteRecordsRequest = createDeleteRecordsRequestForTopicPartition(topicPartition, offsetToDelete) + +// call the API +val response = sendDeleteRecordsRequest(request, leaderId) +val partitionResult = response.data.topics.find(topicPartition.topic).partitions.find(topicPartition.partition) + +// Validate the expected error code in the response +assertEquals(Errors.NONE.code(), partitionResult.errorCode(), + s"Unexpected error code received: ${Errors.forCode(partitionResult.errorCode).name()}") + +// Validate the expected lowWaterMark in the response +assertEquals(offsetToDelete, partitionResult.lowWatermark(), + s"Unexpected lowWatermark received: ${partitionResult.lowWatermark}") + +// Validate that the records have actually deleted +validateLogStartOffsetForTopic(topicPartition, offsetToDelete) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testErrorWhenDeletingRecordsWithInvalidOffset(quorum: String): Unit = { +val (topicPartition: TopicPartition, leaderId: Int) = createTopicAndSendRecords + +// Create the DeleteRecord request requesting deletion of offset which is not present +val offsetToDelete = MESSAGES_PRODUCED_PER_PARTITION + 5 +val request: DeleteRecordsRequest = createDeleteRecordsRequestForTopicPartition(topicPartition, offsetToDelete) + +// call the API +val response = sendDeleteRecordsRequest(request, leaderId) +val partitionResult = response.data.topics.find(topicPartition.topic).partitions.find(topicPartition.partition) + +// Validate the expected error code in the response +assertEquals(Errors.OFFSET_OUT_OF_RANGE.code(), partitionResult.errorCode(), + s"Unexpected error code received: ${Errors.forCode(partitionResult.errorCode()).name()}") + +// Validate the expected value for low watermark +assertEquals(DeleteRecordsResponse.INVALID_LOW_WATERMARK, partitionResult.lowWatermark()) + +// After error, the offset of the topic should have been the original i.e. delete record should not have deleted +// records. +validateLogStartOffsetForTopic(topicPartition, 0) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testErrorWhenDeletingRecordsWithInvalidTopic(quorum: String): Unit = { +val invalidTopicPartition = new TopicPartition("invalid-topic", 0) +// Create
[GitHub] [kafka] showuon commented on a diff in pull request #12087: KAFKA-13851: Add integration tests for DeleteRecords API
showuon commented on code in PR #12087: URL: https://github.com/apache/kafka/pull/12087#discussion_r870126321 ## core/src/test/scala/unit/kafka/server/DeleteRecordsRequestTest.scala: ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.TestInfoUtils +import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.DeleteRecordsRequestData +import org.apache.kafka.common.message.DeleteRecordsRequestData.{DeleteRecordsPartition, DeleteRecordsTopic} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{DeleteRecordsRequest, DeleteRecordsResponse} +import org.apache.kafka.common.serialization.StringSerializer +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.util.Collections +import java.util.concurrent.{Future, TimeUnit} +import scala.collection.Seq + +class DeleteRecordsRequestTest extends BaseRequestTest { + private val TIMEOUT_MS = 1000 + private val MESSAGES_PRODUCED_PER_PARTITION = 10 + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDeleteRecordsHappyCase(quorum: String): Unit = { +val (topicPartition: TopicPartition, leaderId: Int) = createTopicAndSendRecords + +// Create the DeleteRecord request requesting deletion of offset which is not present +val offsetToDelete = Math.max(MESSAGES_PRODUCED_PER_PARTITION - 8, 0) +val request: DeleteRecordsRequest = createDeleteRecordsRequestForTopicPartition(topicPartition, offsetToDelete) + +// call the API +val response = sendDeleteRecordsRequest(request, leaderId) +val partitionResult = response.data.topics.find(topicPartition.topic).partitions.find(topicPartition.partition) + +// Validate the expected error code in the response +assertEquals(Errors.NONE.code(), partitionResult.errorCode(), + s"Unexpected error code received: ${Errors.forCode(partitionResult.errorCode).name()}") + +// Validate the expected lowWaterMark in the response +assertEquals(offsetToDelete, partitionResult.lowWatermark(), + s"Unexpected lowWatermark received: ${partitionResult.lowWatermark}") + +// Validate that the records have actually deleted +validateLogStartOffsetForTopic(topicPartition, offsetToDelete) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testErrorWhenDeletingRecordsWithInvalidOffset(quorum: String): Unit = { +val (topicPartition: TopicPartition, leaderId: Int) = createTopicAndSendRecords + +// Create the DeleteRecord request requesting deletion of offset which is not present +val offsetToDelete = MESSAGES_PRODUCED_PER_PARTITION + 5 +val request: DeleteRecordsRequest = createDeleteRecordsRequestForTopicPartition(topicPartition, offsetToDelete) + +// call the API +val response = sendDeleteRecordsRequest(request, leaderId) +val partitionResult = response.data.topics.find(topicPartition.topic).partitions.find(topicPartition.partition) + +// Validate the expected error code in the response +assertEquals(Errors.OFFSET_OUT_OF_RANGE.code(), partitionResult.errorCode(), + s"Unexpected error code received: ${Errors.forCode(partitionResult.errorCode()).name()}") + +// Validate the expected value for low watermark +assertEquals(DeleteRecordsResponse.INVALID_LOW_WATERMARK, partitionResult.lowWatermark()) + +// After error, the offset of the topic should have been the original i.e. delete record should not have deleted +// records. +validateLogStartOffsetForTopic(topicPartition, 0) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testErrorWhenDeletingRecordsWithInvalidTopic(quorum: String): Unit = { +val invalidTopicPartition = new TopicPartition("invalid-topic", 0) +// Create
[GitHub] [kafka] showuon commented on a diff in pull request #12087: KAFKA-13851: Add integration tests for DeleteRecords API
showuon commented on code in PR #12087: URL: https://github.com/apache/kafka/pull/12087#discussion_r870132658 ## core/src/test/scala/unit/kafka/server/DeleteRecordsRequestTest.scala: ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.common.{IsolationLevel, TopicPartition} +import org.apache.kafka.common.message.DeleteRecordsRequestData +import org.apache.kafka.common.message.DeleteRecordsRequestData.{DeleteRecordsPartition, DeleteRecordsTopic} +import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{DeleteRecordsRequest, DeleteRecordsResponse, ListOffsetsRequest, ListOffsetsResponse} +import org.apache.kafka.common.serialization.StringSerializer +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} + +import java.util.Collections +import scala.collection.Seq +import scala.jdk.CollectionConverters._ + +class DeleteRecordsRequestTest extends BaseRequestTest { + private val TIMEOUT_MS = 1000 + private val MESSAGES_PRODUCED_PER_PARTITION = 10 + private var producer: KafkaProducer[String, String] = null + + @BeforeEach + override def setUp(testInfo: TestInfo): Unit = { +super.setUp(testInfo) +producer = TestUtils.createProducer(bootstrapServers(), + keySerializer = new StringSerializer, valueSerializer = new StringSerializer) + } + + @AfterEach + override def tearDown(): Unit = { +if (producer != null) + producer.close() +super.tearDown() + } + + @Test + def testDeleteRecordsHappyCase(): Unit = { +val (topicPartition: TopicPartition, leaderId: Int) = createTopicAndSendRecords + +// Create the DeleteRecord request requesting deletion of offset which is not present +val offsetToDelete = Math.max(MESSAGES_PRODUCED_PER_PARTITION - 8, 0) +val request: DeleteRecordsRequest = createDeleteRecordsRequestForTopicPartition(topicPartition, offsetToDelete) + +// call the API +val response = sendDeleteRecordsRequest(request, leaderId) +val partitionResult = response.data.topics.find(topicPartition.topic).partitions.find(topicPartition.partition) + +// Validate the expected error code in the response +assertEquals(Errors.NONE.code(), partitionResult.errorCode(), + s"Unexpected error code received: ${Errors.forCode(partitionResult.errorCode).name()}") + +// Validate the expected lowWaterMark in the response +assertEquals(offsetToDelete, partitionResult.lowWatermark(), + s"Unexpected lowWatermark received: ${partitionResult.lowWatermark}") + +// Validate that the records have actually deleted +validateRecordsAreDeleted(topicPartition, leaderId, offsetToDelete) + } + + @Test + def testErrorWhenDeletingRecordsWithInvalidOffset(): Unit = { +val (topicPartition: TopicPartition, leaderId: Int) = createTopicAndSendRecords + +// Create the DeleteRecord request requesting deletion of offset which is not present +val offsetToDelete = MESSAGES_PRODUCED_PER_PARTITION + 5 +val request: DeleteRecordsRequest = createDeleteRecordsRequestForTopicPartition(topicPartition, offsetToDelete) + +// call the API +val response = sendDeleteRecordsRequest(request, leaderId) +val partitionResult = response.data.topics.find(topicPartition.topic).partitions.find(topicPartition.partition) + +// Validate the expected error code in the response +assertEquals(Errors.OFFSET_OUT_OF_RANGE.code(), partitionResult.errorCode(), + s"Unexpected error code received: ${Errors.forCode(partitionResult.errorCode()).name()}") + } + + @Test + def testErrorWhenDeletingRecordsWithInvalidTopic(): Unit = { +val (_, leaderId: Int) = createTopicAndSendRecords + +val invalidTopicPartition = new TopicPartition("invalid-topic", 0) +// Create the DeleteRecord request requesting deletion of offset which is not present +val