jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r622505823



##########
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##########
@@ -1267,6 +1266,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(100L, latests.get(t1p0))
   }
 
+  @Test
+  def testBeginningOffsetsOnCompactedTopic(): Unit = {
+    val topic0 = "topicWithoutCompaction"
+    val topic1 = "topicWithCompaction"
+    val t0p0 = new TopicPartition(topic0, 0)
+    val t1p0 = new TopicPartition(topic1, 0)
+    val t1p1 = new TopicPartition(topic1, 1)
+    val partitions = Set(t0p0, t1p0, t1p1).asJava
+
+    val producerProps = new Properties()
+    // Each batch will hold about 10 records
+    producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "256")
+    val producer = createProducer(configOverrides = producerProps)
+    // First topic will not have compaction. Simply a sanity test.
+    createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 1, 
recordsPerPartition = 100)
+
+    
+    // Second topic will have compaction. 
+    // The first partition will have compaction occur at offset 0 so 
beginningOffsets should be nonzero.
+    // The second partition will not have compaction occur at offset 0, so 
beginningOffsets will remain 0.
+    val props = new Properties()
+    props.setProperty(LogConfig.MaxCompactionLagMsProp, "1")
+    props.setProperty(LogConfig.CleanupPolicyProp, "compact")
+    props.setProperty(LogConfig.MinCleanableDirtyRatioProp, "0.01")
+    props.setProperty(LogConfig.SegmentBytesProp, "512")
+
+    // Send records to first partition -- all duplicates.
+    createTopic(topic1, numPartitions = 2, replicationFactor = 1, props)
+    TestUtils.sendRecordsWithKey(producer, 100, 0L, new TopicPartition(topic1, 
0), "key")
+
+    // Send records fo second partition -- only last 50 records are duplicates.
+    sendRecords(producer, 50, t1p1)
+    TestUtils.sendRecordsWithKey(producer, 50, 50L, new TopicPartition(topic1, 
1), "key")
+
+    // Sleep to allow compaction to take place.
+    Thread.sleep(25000)

Review comment:
       Sounds good. On further inspection looks like `log.cleaner.backoff.ms` 
is 15 seconds, which is why this is happening. I could try to figure out how to 
change this property for this test, but it may be easier to remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to