hachikuji commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r621668165
########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ########## @@ -340,17 +343,18 @@ private FilterResult(ByteBuffer outputBuffer) { private void updateRetainedBatchMetadata(MutableRecordBatch retainedBatch, int numMessagesInBatch, boolean headerOnly) { int bytesRetained = headerOnly ? DefaultRecordBatch.RECORD_BATCH_OVERHEAD : retainedBatch.sizeInBytes(); updateRetainedBatchMetadata(retainedBatch.maxTimestamp(), retainedBatch.lastOffset(), - retainedBatch.lastOffset(), numMessagesInBatch, bytesRetained); + retainedBatch.lastOffset(), retainedBatch.baseOffset(), numMessagesInBatch, bytesRetained); } private void updateRetainedBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset, - int messagesRetained, int bytesRetained) { + long baseOffset, int messagesRetained, int bytesRetained) { Review comment: nit: this looks misaligned ########## File path: core/src/main/scala/kafka/log/LogCleaner.scala ########## @@ -700,11 +716,18 @@ private[log] class Cleaner(val id: Int, // if any messages are to be retained, write them out val outputBuffer = result.outputBuffer if (outputBuffer.position() > 0) { + if (destSegment.isEmpty) { + // create a new segment with a suffix appended to the name of the log and indexes + destSegment = Some(LogCleaner.createNewCleanedSegment(log.dir, log.config, result.baseOffsetOfFirstBatch())) + transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex) + transactionMetadata.appendTransactionIndex() + } + outputBuffer.flip() val retained = MemoryRecords.readableRecords(outputBuffer) // it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads // after `Log.replaceSegments` (which acquires the lock) is called - dest.append(largestOffset = result.maxOffset, + destSegment.get.append(largestOffset = result.maxOffset, Review comment: nit: we could probably do something like this to avoid the nasty `get` calls in here ```scala val segment = destSegment.getOrElse { val newSegment = LogCleaner.createNewCleanedSegment(log.dir, log.config, result.baseOffsetOfFirstBatch()) transactionMetadata.cleanedIndex = Some(newSegment.txnIndex) transactionMetadata.appendTransactionIndex() destSegment = Some(newSegment) newSegment } ``` ########## File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ########## @@ -984,19 +1003,26 @@ class LogCleanerTest { def distinctValuesBySegment = log.logSegments.map(s => s.log.records.asScala.map(record => TestUtils.readString(record.value)).toSet.size).toSeq - val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment + val distinctValuesBySegmentBeforeClean = distinctValuesBySegment assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N), "Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.") + log.updateHighWatermark(log.activeSegment.baseOffset) cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, firstUncleanableOffset)) val distinctValuesBySegmentAfterClean = distinctValuesBySegment - assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean) - .take(numCleanableSegments).forall { case (before, after) => after < before }, + // One segment should have been completely deleted, so there will be fewer segments. + assertTrue(distinctValuesBySegmentAfterClean.size < distinctValuesBySegmentBeforeClean.size) + + // Drop the first segment from before cleaning since it was removed. Also subtract 1 from numCleanableSegments + val normalizedDistinctValuesBySegmentBeforeClean = distinctValuesBySegmentBeforeClean.drop(1) Review comment: The logic in this test case has become rather obscure after the change. Maybe we could do something simpler than comparing segment by segment. As far as I can tell, all the test is doing is ensuring that the first uncleanable offset is respected. Maybe a simpler test would just write the same key over and over and then assert that all records below the uncleanable offset are removed and all values above that offset are retained? ########## File path: core/src/main/scala/kafka/log/LogCleaner.scala ########## @@ -1161,6 +1191,14 @@ private[log] class CleanedTransactionMetadata { } } + /** + * Apply transactions that accumulated before cleanedIndex was applied + */ + def appendTransactionIndex(): Unit = { + toAppend.foreach(transaction => cleanedIndex.foreach(_.append(transaction))) + toAppend = ListBuffer.empty Review comment: Maybe we can just `clear()`? ########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ########## @@ -340,17 +343,18 @@ private FilterResult(ByteBuffer outputBuffer) { private void updateRetainedBatchMetadata(MutableRecordBatch retainedBatch, int numMessagesInBatch, boolean headerOnly) { int bytesRetained = headerOnly ? DefaultRecordBatch.RECORD_BATCH_OVERHEAD : retainedBatch.sizeInBytes(); updateRetainedBatchMetadata(retainedBatch.maxTimestamp(), retainedBatch.lastOffset(), - retainedBatch.lastOffset(), numMessagesInBatch, bytesRetained); + retainedBatch.lastOffset(), retainedBatch.baseOffset(), numMessagesInBatch, bytesRetained); } private void updateRetainedBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset, - int messagesRetained, int bytesRetained) { + long baseOffset, int messagesRetained, int bytesRetained) { validateBatchMetadata(maxTimestamp, shallowOffsetOfMaxTimestamp, maxOffset); if (maxTimestamp > this.maxTimestamp) { this.maxTimestamp = maxTimestamp; this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp; } this.maxOffset = Math.max(maxOffset, this.maxOffset); + if (this.bytesRetained == 0) this.baseOffsetOfFirstBatch = baseOffset; Review comment: It seems like it would be more direct to check `baseOffsetOfFirstBatch < 0`. ########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -2659,3 +2672,9 @@ case object LogDeletion extends SegmentDeletionReason { log.info(s"Deleting segments as the log has been deleted: ${toDelete.mkString(",")}") } } + +case object SegmentCompaction extends SegmentDeletionReason { + override def logReason(log: Log, toDelete: List[LogSegment]): Unit = { + log.info(s"Deleting segments as all records were deleted in log compaction: ${toDelete.mkString(",")}") Review comment: nit: "deleted during log compaction"? ########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ########## @@ -386,6 +390,13 @@ public long maxOffset() { return maxOffset; } + /** + * @return the baseOffset of the first batch of retained records or -1 if no batches are retained + */ + public long baseOffsetOfFirstBatch() { Review comment: Would it be worth using `OptionalLong`? ########## File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ########## @@ -1267,6 +1266,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(100L, latests.get(t1p0)) } + @Test + def testBeginningOffsetsOnCompactedTopic(): Unit = { + val topic0 = "topicWithoutCompaction" + val topic1 = "topicWithCompaction" + val t0p0 = new TopicPartition(topic0, 0) + val t1p0 = new TopicPartition(topic1, 0) + val t1p1 = new TopicPartition(topic1, 1) + val partitions = Set(t0p0, t1p0, t1p1).asJava + + val producerProps = new Properties() + // Each batch will hold about 10 records + producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "256") + val producer = createProducer(configOverrides = producerProps) + // First topic will not have compaction. Simply a sanity test. + createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 1, recordsPerPartition = 100) + Review comment: nit: unneeded newline ########## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ########## @@ -595,8 +595,8 @@ private[log] object LogCleanerManager extends Logging { // may be cleaned val firstUncleanableDirtyOffset: Long = Seq( - // we do not clean beyond the first unstable offset - log.firstUnstableOffset, + // we do not clean beyond the lastStableOffset Review comment: Maybe worth emphasizing that this also implies that we do not clean beyond the high watermark? ```scala // we do not clean beyond the last stable offset (and therefore the high watermark) ``` ########## File path: core/src/main/scala/kafka/log/LogCleaner.scala ########## @@ -700,11 +716,18 @@ private[log] class Cleaner(val id: Int, // if any messages are to be retained, write them out val outputBuffer = result.outputBuffer if (outputBuffer.position() > 0) { + if (destSegment.isEmpty) { + // create a new segment with a suffix appended to the name of the log and indexes + destSegment = Some(LogCleaner.createNewCleanedSegment(log.dir, log.config, result.baseOffsetOfFirstBatch())) Review comment: nit: unnecessary parenthesis `baseOffsetOfFirstBatch()` ########## File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ########## @@ -540,13 +540,34 @@ class LogCleanerManagerTest extends Logging { while(log.numberOfSegments < 8) log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0) + log.updateHighWatermark(log.activeSegment.baseOffset) val lastCleanOffset = Some(0L) val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds) assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.") assertEquals(log.activeSegment.baseOffset, cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset begins with the active segment.") } + @Test + def testCleanableOffsetsForNoneWithLowerHighWatermark(): Unit = { Review comment: nit: the name is not very clear. What does `None` refer to? ########## File path: core/src/main/scala/kafka/log/LogCleaner.scala ########## @@ -599,20 +607,27 @@ private[log] class Cleaner(val id: Int, currentSegmentOpt = nextSegmentOpt } - cleaned.onBecomeInactiveSegment() - // flush new segment to disk before swap - cleaned.flush() + cleanedSegment match { + case Some(cleaned) => + // Result of cleaning included at least one record. + cleaned.onBecomeInactiveSegment() + // flush new segment to disk before swap + cleaned.flush() - // update the modification date to retain the last modified date of the original files - val modified = segments.last.lastModified - cleaned.lastModified = modified + // update the modification date to retain the last modified date of the original files + val modified = segments.last.lastModified + cleaned.lastModified = modified - // swap in new segment - info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in log $log") - log.replaceSegments(List(cleaned), segments) + // swap in new segment + info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in log $log") + log.replaceSegments(List(cleaned), segments) + case None => + info(s"Deleting segment(s) $segments in log $log") Review comment: nit: maybe add some explanation? Maybe `Deleting segments $segments in log $log since no records were retained after cleaning`? ########## File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ########## @@ -1267,6 +1266,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(100L, latests.get(t1p0)) } + @Test + def testBeginningOffsetsOnCompactedTopic(): Unit = { + val topic0 = "topicWithoutCompaction" + val topic1 = "topicWithCompaction" + val t0p0 = new TopicPartition(topic0, 0) + val t1p0 = new TopicPartition(topic1, 0) + val t1p1 = new TopicPartition(topic1, 1) + val partitions = Set(t0p0, t1p0, t1p1).asJava + + val producerProps = new Properties() + // Each batch will hold about 10 records + producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "256") + val producer = createProducer(configOverrides = producerProps) + // First topic will not have compaction. Simply a sanity test. + createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 1, recordsPerPartition = 100) + + + // Second topic will have compaction. + // The first partition will have compaction occur at offset 0 so beginningOffsets should be nonzero. + // The second partition will not have compaction occur at offset 0, so beginningOffsets will remain 0. + val props = new Properties() + props.setProperty(LogConfig.MaxCompactionLagMsProp, "1") + props.setProperty(LogConfig.CleanupPolicyProp, "compact") + props.setProperty(LogConfig.MinCleanableDirtyRatioProp, "0.01") + props.setProperty(LogConfig.SegmentBytesProp, "512") + + // Send records to first partition -- all duplicates. + createTopic(topic1, numPartitions = 2, replicationFactor = 1, props) + TestUtils.sendRecordsWithKey(producer, 100, 0L, new TopicPartition(topic1, 0), "key") + + // Send records fo second partition -- only last 50 records are duplicates. + sendRecords(producer, 50, t1p1) + TestUtils.sendRecordsWithKey(producer, 50, 50L, new TopicPartition(topic1, 1), "key") + + // Sleep to allow compaction to take place. + Thread.sleep(25000) Review comment: Ouch. 25s of sleep is significant. I wonder if this test is overkill given testing we have in `LogCleanerTest`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org