[GitHub] [kafka] showuon commented on a diff in pull request #12087: KAFKA-13851: Add integration tests for DeleteRecords API

2022-05-12 Thread GitBox


showuon commented on code in PR #12087:
URL: https://github.com/apache/kafka/pull/12087#discussion_r871398002


##
core/src/test/scala/unit/kafka/server/DeleteRecordsRequestTest.scala:
##
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.TestInfoUtils
+import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.DeleteRecordsRequestData
+import 
org.apache.kafka.common.message.DeleteRecordsRequestData.{DeleteRecordsPartition,
 DeleteRecordsTopic}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{DeleteRecordsRequest, 
DeleteRecordsResponse}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.Collections
+import java.util.concurrent.{Future, TimeUnit}
+import scala.collection.Seq
+
+class DeleteRecordsRequestTest extends BaseRequestTest {
+  private val TIMEOUT_MS = 1000
+  private val MESSAGES_PRODUCED_PER_PARTITION = 10
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteRecordsHappyCase(quorum: String): Unit = {
+val (topicPartition: TopicPartition, leaderId: Int) = 
createTopicAndSendRecords
+
+// Create the DeleteRecord request requesting deletion of offset which is 
not present
+val offsetToDelete = Math.max(MESSAGES_PRODUCED_PER_PARTITION - 8, 0)
+val request: DeleteRecordsRequest = 
createDeleteRecordsRequestForTopicPartition(topicPartition, offsetToDelete)
+
+// call the API
+val response = sendDeleteRecordsRequest(request, leaderId)
+val partitionResult = 
response.data.topics.find(topicPartition.topic).partitions.find(topicPartition.partition)
+
+// Validate the expected error code in the response
+assertEquals(Errors.NONE.code(), partitionResult.errorCode(),
+  s"Unexpected error code received: 
${Errors.forCode(partitionResult.errorCode).name()}")
+
+// Validate the expected lowWaterMark in the response
+assertEquals(offsetToDelete, partitionResult.lowWatermark(),
+  s"Unexpected lowWatermark received: ${partitionResult.lowWatermark}")
+
+// Validate that the records have actually deleted
+validateLogStartOffsetForTopic(topicPartition, offsetToDelete)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testErrorWhenDeletingRecordsWithInvalidOffset(quorum: String): Unit = {
+val (topicPartition: TopicPartition, leaderId: Int) = 
createTopicAndSendRecords
+
+// Create the DeleteRecord request requesting deletion of offset which is 
not present
+val offsetToDelete = MESSAGES_PRODUCED_PER_PARTITION + 5
+val request: DeleteRecordsRequest = 
createDeleteRecordsRequestForTopicPartition(topicPartition, offsetToDelete)
+
+// call the API
+val response = sendDeleteRecordsRequest(request, leaderId)
+val partitionResult = 
response.data.topics.find(topicPartition.topic).partitions.find(topicPartition.partition)
+
+// Validate the expected error code in the response
+assertEquals(Errors.OFFSET_OUT_OF_RANGE.code(), 
partitionResult.errorCode(),
+  s"Unexpected error code received: 
${Errors.forCode(partitionResult.errorCode()).name()}")
+
+// Validate the expected value for low watermark
+assertEquals(DeleteRecordsResponse.INVALID_LOW_WATERMARK, 
partitionResult.lowWatermark())
+
+// After error, the offset of the topic should have been the original i.e. 
delete record should not have deleted
+// records.
+validateLogStartOffsetForTopic(topicPartition, 0)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testErrorWhenDeletingRecordsWithInvalidTopic(quorum: String): Unit = {
+val invalidTopicPartition = new TopicPartition("invalid-topic", 0)
+// Create 

[GitHub] [kafka] showuon commented on a diff in pull request #12087: KAFKA-13851: Add integration tests for DeleteRecords API

2022-05-11 Thread GitBox


showuon commented on code in PR #12087:
URL: https://github.com/apache/kafka/pull/12087#discussion_r870126321


##
core/src/test/scala/unit/kafka/server/DeleteRecordsRequestTest.scala:
##
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.TestInfoUtils
+import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.DeleteRecordsRequestData
+import 
org.apache.kafka.common.message.DeleteRecordsRequestData.{DeleteRecordsPartition,
 DeleteRecordsTopic}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{DeleteRecordsRequest, 
DeleteRecordsResponse}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.Collections
+import java.util.concurrent.{Future, TimeUnit}
+import scala.collection.Seq
+
+class DeleteRecordsRequestTest extends BaseRequestTest {
+  private val TIMEOUT_MS = 1000
+  private val MESSAGES_PRODUCED_PER_PARTITION = 10
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteRecordsHappyCase(quorum: String): Unit = {
+val (topicPartition: TopicPartition, leaderId: Int) = 
createTopicAndSendRecords
+
+// Create the DeleteRecord request requesting deletion of offset which is 
not present
+val offsetToDelete = Math.max(MESSAGES_PRODUCED_PER_PARTITION - 8, 0)
+val request: DeleteRecordsRequest = 
createDeleteRecordsRequestForTopicPartition(topicPartition, offsetToDelete)
+
+// call the API
+val response = sendDeleteRecordsRequest(request, leaderId)
+val partitionResult = 
response.data.topics.find(topicPartition.topic).partitions.find(topicPartition.partition)
+
+// Validate the expected error code in the response
+assertEquals(Errors.NONE.code(), partitionResult.errorCode(),
+  s"Unexpected error code received: 
${Errors.forCode(partitionResult.errorCode).name()}")
+
+// Validate the expected lowWaterMark in the response
+assertEquals(offsetToDelete, partitionResult.lowWatermark(),
+  s"Unexpected lowWatermark received: ${partitionResult.lowWatermark}")
+
+// Validate that the records have actually deleted
+validateLogStartOffsetForTopic(topicPartition, offsetToDelete)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testErrorWhenDeletingRecordsWithInvalidOffset(quorum: String): Unit = {
+val (topicPartition: TopicPartition, leaderId: Int) = 
createTopicAndSendRecords
+
+// Create the DeleteRecord request requesting deletion of offset which is 
not present
+val offsetToDelete = MESSAGES_PRODUCED_PER_PARTITION + 5
+val request: DeleteRecordsRequest = 
createDeleteRecordsRequestForTopicPartition(topicPartition, offsetToDelete)
+
+// call the API
+val response = sendDeleteRecordsRequest(request, leaderId)
+val partitionResult = 
response.data.topics.find(topicPartition.topic).partitions.find(topicPartition.partition)
+
+// Validate the expected error code in the response
+assertEquals(Errors.OFFSET_OUT_OF_RANGE.code(), 
partitionResult.errorCode(),
+  s"Unexpected error code received: 
${Errors.forCode(partitionResult.errorCode()).name()}")
+
+// Validate the expected value for low watermark
+assertEquals(DeleteRecordsResponse.INVALID_LOW_WATERMARK, 
partitionResult.lowWatermark())
+
+// After error, the offset of the topic should have been the original i.e. 
delete record should not have deleted
+// records.
+validateLogStartOffsetForTopic(topicPartition, 0)
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testErrorWhenDeletingRecordsWithInvalidTopic(quorum: String): Unit = {
+val invalidTopicPartition = new TopicPartition("invalid-topic", 0)
+// Create 

[GitHub] [kafka] showuon commented on a diff in pull request #12087: KAFKA-13851: Add integration tests for DeleteRecords API

2022-05-11 Thread GitBox


showuon commented on code in PR #12087:
URL: https://github.com/apache/kafka/pull/12087#discussion_r870132658


##
core/src/test/scala/unit/kafka/server/DeleteRecordsRequestTest.scala:
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, 
RecordMetadata}
+import org.apache.kafka.common.{IsolationLevel, TopicPartition}
+import org.apache.kafka.common.message.DeleteRecordsRequestData
+import 
org.apache.kafka.common.message.DeleteRecordsRequestData.{DeleteRecordsPartition,
 DeleteRecordsTopic}
+import 
org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, 
ListOffsetsTopic}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{DeleteRecordsRequest, 
DeleteRecordsResponse, ListOffsetsRequest, ListOffsetsResponse}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+
+import java.util.Collections
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+class DeleteRecordsRequestTest extends BaseRequestTest {
+  private val TIMEOUT_MS = 1000
+  private val MESSAGES_PRODUCED_PER_PARTITION = 10
+  private var producer: KafkaProducer[String, String] = null
+
+  @BeforeEach
+  override def setUp(testInfo: TestInfo): Unit = {
+super.setUp(testInfo)
+producer = TestUtils.createProducer(bootstrapServers(),
+  keySerializer = new StringSerializer, valueSerializer = new 
StringSerializer)
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+if (producer != null)
+  producer.close()
+super.tearDown()
+  }
+
+  @Test
+  def testDeleteRecordsHappyCase(): Unit = {
+val (topicPartition: TopicPartition, leaderId: Int) = 
createTopicAndSendRecords
+
+// Create the DeleteRecord request requesting deletion of offset which is 
not present
+val offsetToDelete = Math.max(MESSAGES_PRODUCED_PER_PARTITION - 8, 0)
+val request: DeleteRecordsRequest = 
createDeleteRecordsRequestForTopicPartition(topicPartition, offsetToDelete)
+
+// call the API
+val response = sendDeleteRecordsRequest(request, leaderId)
+val partitionResult = 
response.data.topics.find(topicPartition.topic).partitions.find(topicPartition.partition)
+
+// Validate the expected error code in the response
+assertEquals(Errors.NONE.code(), partitionResult.errorCode(),
+  s"Unexpected error code received: 
${Errors.forCode(partitionResult.errorCode).name()}")
+
+// Validate the expected lowWaterMark in the response
+assertEquals(offsetToDelete, partitionResult.lowWatermark(),
+  s"Unexpected lowWatermark received: ${partitionResult.lowWatermark}")
+
+// Validate that the records have actually deleted
+validateRecordsAreDeleted(topicPartition, leaderId, offsetToDelete)
+  }
+
+  @Test
+  def testErrorWhenDeletingRecordsWithInvalidOffset(): Unit = {
+val (topicPartition: TopicPartition, leaderId: Int) = 
createTopicAndSendRecords
+
+// Create the DeleteRecord request requesting deletion of offset which is 
not present
+val offsetToDelete = MESSAGES_PRODUCED_PER_PARTITION + 5
+val request: DeleteRecordsRequest = 
createDeleteRecordsRequestForTopicPartition(topicPartition, offsetToDelete)
+
+// call the API
+val response = sendDeleteRecordsRequest(request, leaderId)
+val partitionResult = 
response.data.topics.find(topicPartition.topic).partitions.find(topicPartition.partition)
+
+// Validate the expected error code in the response
+assertEquals(Errors.OFFSET_OUT_OF_RANGE.code(), 
partitionResult.errorCode(),
+  s"Unexpected error code received: 
${Errors.forCode(partitionResult.errorCode()).name()}")
+  }
+
+  @Test
+  def testErrorWhenDeletingRecordsWithInvalidTopic(): Unit = {
+val (_, leaderId: Int) = createTopicAndSendRecords
+
+val invalidTopicPartition = new TopicPartition("invalid-topic", 0)
+// Create the DeleteRecord request requesting deletion of offset which is 
not present
+val