ijuma commented on code in PR #18321:
URL: https://github.com/apache/kafka/pull/18321#discussion_r1907350932


##########
core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala:
##########
@@ -134,6 +135,131 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
     assertEquals(toMap(messages), toMap(read), "Contents of the map shouldn't 
change")
   }
 
+  @ParameterizedTest
+  @ArgumentsSource(classOf[LogCleanerParameterizedIntegrationTest.ExcludeZstd])
+  def testCleanerWithMessageFormatV0V1V2(compressionType: CompressionType): 
Unit = {
+    val compression = Compression.of(compressionType).build()
+    val largeMessageKey = 20
+    val (largeMessageValue, largeMessageSet) = 
createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0, 
compression)
+    val maxMessageSize = compression match {
+      case Compression.NONE => largeMessageSet.sizeInBytes
+      case _ =>
+        // the broker assigns absolute offsets for message format 0 which 
potentially causes the compressed size to
+        // increase because the broker offsets are larger than the ones 
assigned by the client
+        // adding `6` to the message set size is good enough for this test: it 
covers the increased message size while
+        // still being less than the overhead introduced by the conversion 
from message format version 0 to 1
+        largeMessageSet.sizeInBytes + 6

Review Comment:
   Removing it causes the test to fail with:
   
   > org.apache.kafka.common.errors.RecordTooLargeException: Message batch size 
is 194 bytes in append topartition log-0 which exceeds the maximum configured 
size of 190.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to