jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r622360387



##########
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##########
@@ -1267,6 +1266,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(100L, latests.get(t1p0))
   }
 
+  @Test
+  def testBeginningOffsetsOnCompactedTopic(): Unit = {
+    val topic0 = "topicWithoutCompaction"
+    val topic1 = "topicWithCompaction"
+    val t0p0 = new TopicPartition(topic0, 0)
+    val t1p0 = new TopicPartition(topic1, 0)
+    val t1p1 = new TopicPartition(topic1, 1)
+    val partitions = Set(t0p0, t1p0, t1p1).asJava
+
+    val producerProps = new Properties()
+    // Each batch will hold about 10 records
+    producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "256")
+    val producer = createProducer(configOverrides = producerProps)
+    // First topic will not have compaction. Simply a sanity test.
+    createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 1, 
recordsPerPartition = 100)
+
+    
+    // Second topic will have compaction. 
+    // The first partition will have compaction occur at offset 0 so 
beginningOffsets should be nonzero.
+    // The second partition will not have compaction occur at offset 0, so 
beginningOffsets will remain 0.
+    val props = new Properties()
+    props.setProperty(LogConfig.MaxCompactionLagMsProp, "1")
+    props.setProperty(LogConfig.CleanupPolicyProp, "compact")
+    props.setProperty(LogConfig.MinCleanableDirtyRatioProp, "0.01")
+    props.setProperty(LogConfig.SegmentBytesProp, "512")
+
+    // Send records to first partition -- all duplicates.
+    createTopic(topic1, numPartitions = 2, replicationFactor = 1, props)
+    TestUtils.sendRecordsWithKey(producer, 100, 0L, new TopicPartition(topic1, 
0), "key")
+
+    // Send records fo second partition -- only last 50 records are duplicates.
+    sendRecords(producer, 50, t1p1)
+    TestUtils.sendRecordsWithKey(producer, 50, 50L, new TopicPartition(topic1, 
1), "key")
+
+    // Sleep to allow compaction to take place.
+    Thread.sleep(25000)

Review comment:
       I was able to get it to 15 seconds consistently (50 passed tests) but 
once we get to 13, it starts flaking. Not sure if this is good enough to keep 
the test




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to