divijvaidya commented on code in PR #17840:
URL: https://github.com/apache/kafka/pull/17840#discussion_r1845404340
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1543,6 +1547,76 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertNull(returnedOffsets.get(topicPartition))
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ def testDeleteRecordsAfterCorruptRecords(quorum: String, groupProtocol:
String): Unit = {
+ val config = new Properties()
+ config.put("segment.ms", "1000")
+ createTopic(topic, numPartitions = 1, replicationFactor = 1, config)
+
+ client = createAdminClient
+
+ val consumer = createConsumer()
+ subscribeAndWaitForAssignment(topic, consumer)
+
+ val producer = createProducer()
Review Comment:
we need to close this and other closeables in a try-with-resources
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1543,6 +1547,76 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertNull(returnedOffsets.get(topicPartition))
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ def testDeleteRecordsAfterCorruptRecords(quorum: String, groupProtocol:
String): Unit = {
+ val config = new Properties()
+ config.put("segment.ms", "1000")
Review Comment:
can we change the logic to rotate based on size? That will help us remove
the Time.sleep in this test
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1543,6 +1547,76 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertNull(returnedOffsets.get(topicPartition))
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ def testDeleteRecordsAfterCorruptRecords(quorum: String, groupProtocol:
String): Unit = {
Review Comment:
Should this test go in `DeleteRecordsRequestTest`?
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1543,6 +1547,76 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertNull(returnedOffsets.get(topicPartition))
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ def testDeleteRecordsAfterCorruptRecords(quorum: String, groupProtocol:
String): Unit = {
+ val config = new Properties()
+ config.put("segment.ms", "1000")
+ createTopic(topic, numPartitions = 1, replicationFactor = 1, config)
+
+ client = createAdminClient
+
+ val consumer = createConsumer()
+ subscribeAndWaitForAssignment(topic, consumer)
+
+ val producer = createProducer()
+ def sendRecords(begin: Int, end: Int) = {
+ val futures = (begin until end).map( i => {
+ val record = new ProducerRecord(topicPartition.topic,
topicPartition.partition, s"$i".getBytes, s"$i".getBytes)
+ producer.send(record)
+ })
+ futures.foreach(_.get)
+ }
+ // create two segments, each have 10 records
+ sendRecords(0, 10)
+ Thread.sleep(1001)
+ sendRecords(10, 20)
+
+ val topicDesc =
client.describeTopics(Collections.singletonList(topicPartition.topic()))
+ .allTopicNames().get().get(topicPartition.topic())
+ assertEquals(1, topicDesc.partitions().size())
+ val partitionLeaderId = topicDesc.partitions().get(0).leader().id()
+ val logDirMap =
client.describeLogDirs(Collections.singletonList(partitionLeaderId))
+ .allDescriptions().get().get(partitionLeaderId)
+ val logDir = logDirMap.entrySet.stream
+ .filter(entry =>
entry.getValue.replicaInfos.containsKey(topicPartition)).findAny().get().getKey
+ // retrieve the path of the first segment
+ val logFilePath =
LogFileUtils.logFile(Paths.get(logDir).resolve(topicPartition.toString).toFile,
0).toPath
+ val records = FileRecords.open(logFilePath.toFile)
+ assertEquals(10, records.records().asScala.iterator.size)
+
+ // manually load the inactive segment file to corrupt the data
+ val originalContent = Files.readAllBytes(logFilePath)
+ val newContent = ByteBuffer.allocate(JLong.BYTES + Integer.BYTES +
originalContent.length)
+ newContent.putLong(0) // offset
+ newContent.putInt(0) // size -> this will make FileLogInputStream throw
"Found record size 0 smaller than minimum record..."
+ newContent.put(Files.readAllBytes(logFilePath))
+ newContent.flip()
+ Files.write(logFilePath, newContent.array(),
StandardOpenOption.TRUNCATE_EXISTING)
+
+ consumer.seekToBeginning(Collections.singletonList(topicPartition))
+ assertEquals("Encountered corrupt message when fetching offset 0 for
topic-partition topic-0",
+ assertThrows(classOf[KafkaException], () =>
consumer.poll(JDuration.ofMillis(DEFAULT_MAX_WAIT_MS))).getMessage)
+
+ val partitionFollowerId = brokers.map(b => b.config.nodeId).filter(id =>
id != partitionLeaderId).head
+ val newAssignment = Map(topicPartition -> Optional.of(new
NewPartitionReassignment(
+ List(Integer.valueOf(partitionLeaderId),
Integer.valueOf(partitionFollowerId)).asJava))).asJava
+
+ // add follower to topic partition
+ client.alterPartitionReassignments(newAssignment).all().get()
+ // delete records in corrupt segment (the first segment)
+ client.deleteRecords(Map(topicPartition ->
RecordsToDelete.beforeOffset(10L)).asJava).all.get
+ // verify reassignment is finished after delete records
+ TestUtils.waitUntilTrue(() => {
+ val isr =
client.describeTopics(TopicCollection.ofTopicNames(List(topicPartition.topic()).asJava))
+
.allTopicNames().get().get(topic).partitions().get(0).isr().asScala.map(node =>
node.id())
+ isr.equals(List(partitionLeaderId, partitionFollowerId))
+ }, "The isr is incorrect")
Review Comment:
Consider using `TestUtils.waitForBrokersInIsr`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]