divijvaidya commented on code in PR #18140:
URL: https://github.com/apache/kafka/pull/18140#discussion_r1880421924
##########
core/src/test/java/kafka/admin/DeleteTopicTest.java:
##########
@@ -239,7 +239,7 @@ public void testDeleteNonExistingTopic(ClusterInstance
cluster) throws Exception
@ClusterTest(serverProperties = {
@ClusterConfigProperty(key = "log.cleaner.enable", value = "true"),
@ClusterConfigProperty(key = "log.cleanup.policy", value = "compact"),
- @ClusterConfigProperty(key = "log.segment.bytes", value = "100"),
+ @ClusterConfigProperty(key = "log.segment.bytes", value = "1048576"),
Review Comment:
does this test really need this config? From what it looks like it does the
following:
1. create a topic
2. Write 100 X 3 entries where 100 entries are duplicated
3. Waits for cleaning to complete until 0 offset <-- this looks wrong
4. Deletes the topic
5. Verifies topic deletion
Why do we need a segment roll here? (unless I am missing something)
##########
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:
##########
@@ -625,7 +626,7 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
"Config not updated in LogManager")
val log = servers.head.logManager.getLog(new TopicPartition(topic,
0)).getOrElse(throw new IllegalStateException("Log not found"))
- TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing
topic config using defaults not updated")
+ TestUtils.waitUntilTrue(() => log.config.segmentSize == 1048576, "Existing
topic config using defaults not updated")
Review Comment:
You will need to change validation in line 641 as well to retain the
original intention of this test `log.logSegments.asScala.exists(_.size > 3000)`
##########
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:
##########
@@ -610,11 +610,12 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG,
"1000")
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG,
"1000")
props.put(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG,
"false")
- reconfigureServers(props, perBrokerConfig = false,
(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "4000"))
+ reconfigureServers(props, perBrokerConfig = false,
(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576"))
// Verify that all broker defaults have been updated
servers.foreach { server =>
props.forEach { (k, v) =>
+ TestUtils.waitUntilTrue(() => server.config.originals.get(k) != null,
"Configs not present")
Review Comment:
Thank you for adding this.
##########
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:
##########
@@ -610,11 +610,12 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG,
"1000")
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG,
"1000")
props.put(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG,
"false")
- reconfigureServers(props, perBrokerConfig = false,
(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "4000"))
+ reconfigureServers(props, perBrokerConfig = false,
(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576"))
Review Comment:
This test doesn't really care about the new value that we are reconfiguring
it to as long it is a valid value. It just tests if the reconfiguration is
successful.
Could we set it to something like 2GB? It will ensure that if we change the
constraint again in future, we wouldn't have to modify this test.
##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1188,7 +1188,7 @@ class KafkaConfigTest {
case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG =>
assertDynamic(kafkaConfigProp, "5", () =>
config.zstdCompressionLevel)
case TopicConfig.SEGMENT_BYTES_CONFIG =>
- assertDynamic(kafkaConfigProp, 10000, () => config.logSegmentBytes)
+ assertDynamic(kafkaConfigProp, 1048576, () => config.logSegmentBytes)
Review Comment:
Doesn't have to be the min possible value. A very large value like 2GB will
ensure that we don't have to change it in future
##########
core/src/test/scala/unit/kafka/server/LogOffsetTest.scala:
##########
@@ -224,14 +224,14 @@ class LogOffsetTest extends BaseRequestTest {
val now = Time.SYSTEM.milliseconds + 30000 // pretend it is the future to
avoid race conditions with the fs
val offsets = log.legacyFetchOffsetsBefore(now, 15)
- assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L),
offsets)
+ assertEquals(Seq(20L, 0L), offsets)
TestUtils.waitUntilTrue(() => isLeaderLocalOnBroker(topic,
topicPartition.partition, broker),
"Leader should be elected")
val request = ListOffsetsRequest.Builder.forReplica(0, 0)
.setTargetTimes(buildTargetTimes(topicPartition, now, 15).asJava).build()
val consumerOffsets =
findPartition(sendListOffsetsRequest(request).topics.asScala,
topicPartition).oldStyleOffsets.asScala
- assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L),
consumerOffsets)
Review Comment:
can you please help me understanding this change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]