My doubt is regarding the produced batches conversions. (Breaking the zero copy)
I am producing using Kafka streams 3.0 (same version as the Kafka cluster) Messages are compressed on the producer. I am seeing several ProduceConversion per sec rate in a topic, as we know the recompression of the batches breaks the “zero copy” which means more resources to be used by the brokers. Checking the source code I saw different ways of breaking “in place” batches. 1 sourceCompression != DestinationCompression (I.e: producer using GZIP and topic using LZ4) 2 Magic number < 2 or magic number mismatch 3 Offsets I think this is the code snippet which triggers the method to break the zero copy ** Code at the bottom of the email What I suspect is for some reason the offsets are not contiguous on the produced batches which leads me to the main doubt, what could be a scenario when this could happen? I tried to see this with the dump-logs sh tool but of course this is not possible as Kafka already converted the batches. Also I thought about transactions could be the reason of the conversions as they use the IsControl batch but as I saw the IsControl batch will always contain one record (the control), so I assume control batches will never have other “client generated” records. So I would appreciate if you tell me can example of the offsets not contiguos in a batch, in parallel I will continue my investigation as I am intrigued about this conversions. After this I want to write a public document about performance based on batch conversions. Thanks in advance Best regards. Sergio Troiano ----------------------------------------------------------------- recordsIterator.forEachRemaining { record => val expectedOffset = expectedInnerOffset.getAndIncrement() val recordError = validateRecordCompression(batchIndex, record).orElse { validateRecord(batch, topicPartition, record, batchIndex, now, timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats).orElse { if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { if (record.timestamp > maxTimestamp) maxTimestamp = record.timestamp // Some older clients do not implement the V1 internal offsets correctly. // Historically the broker handled this by rewriting the batches rather // than rejecting the request. We must continue this handling here to avoid // breaking these clients. if (record.offset != expectedOffset) inPlaceAssignment = false } None } }