hachikuji commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r621668165



##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -340,17 +343,18 @@ private FilterResult(ByteBuffer outputBuffer) {
         private void updateRetainedBatchMetadata(MutableRecordBatch 
retainedBatch, int numMessagesInBatch, boolean headerOnly) {
             int bytesRetained = headerOnly ? 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD : retainedBatch.sizeInBytes();
             updateRetainedBatchMetadata(retainedBatch.maxTimestamp(), 
retainedBatch.lastOffset(),
-                    retainedBatch.lastOffset(), numMessagesInBatch, 
bytesRetained);
+                    retainedBatch.lastOffset(), retainedBatch.baseOffset(), 
numMessagesInBatch, bytesRetained);
         }
 
         private void updateRetainedBatchMetadata(long maxTimestamp, long 
shallowOffsetOfMaxTimestamp, long maxOffset,
-                                                int messagesRetained, int 
bytesRetained) {
+                                                long baseOffset, int 
messagesRetained, int bytesRetained) {

Review comment:
       nit: this looks misaligned

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -700,11 +716,18 @@ private[log] class Cleaner(val id: Int,
       // if any messages are to be retained, write them out
       val outputBuffer = result.outputBuffer
       if (outputBuffer.position() > 0) {
+        if (destSegment.isEmpty) {
+          // create a new segment with a suffix appended to the name of the 
log and indexes
+          destSegment = Some(LogCleaner.createNewCleanedSegment(log.dir, 
log.config, result.baseOffsetOfFirstBatch()))
+          transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex)
+          transactionMetadata.appendTransactionIndex()
+        }
+        
         outputBuffer.flip()
         val retained = MemoryRecords.readableRecords(outputBuffer)
         // it's OK not to hold the Log's lock in this case, because this 
segment is only accessed by other threads
         // after `Log.replaceSegments` (which acquires the lock) is called
-        dest.append(largestOffset = result.maxOffset,
+        destSegment.get.append(largestOffset = result.maxOffset,

Review comment:
       nit: we could probably do something like this to avoid the nasty `get` 
calls in here
   ```scala
   val segment = destSegment.getOrElse {
     val newSegment = LogCleaner.createNewCleanedSegment(log.dir, log.config, 
result.baseOffsetOfFirstBatch())
     transactionMetadata.cleanedIndex = Some(newSegment.txnIndex)
     transactionMetadata.appendTransactionIndex()
     destSegment = Some(newSegment)
     newSegment
   }
   ```

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
##########
@@ -984,19 +1003,26 @@ class LogCleanerTest {
 
     def distinctValuesBySegment = log.logSegments.map(s => 
s.log.records.asScala.map(record => 
TestUtils.readString(record.value)).toSet.size).toSeq
 
-    val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
+    val distinctValuesBySegmentBeforeClean = distinctValuesBySegment
     assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N),
       "Test is not effective unless each segment contains duplicates. Increase 
segment size or decrease number of keys.")
 
+    log.updateHighWatermark(log.activeSegment.baseOffset)
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0, 
firstUncleanableOffset))
 
     val distinctValuesBySegmentAfterClean = distinctValuesBySegment
 
-    
assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
-      .take(numCleanableSegments).forall { case (before, after) => after < 
before },
+    // One segment should have been completely deleted, so there will be fewer 
segments.
+    assertTrue(distinctValuesBySegmentAfterClean.size < 
distinctValuesBySegmentBeforeClean.size)
+
+    // Drop the first segment from before cleaning since it was removed. Also 
subtract 1 from numCleanableSegments
+    val normalizedDistinctValuesBySegmentBeforeClean = 
distinctValuesBySegmentBeforeClean.drop(1)

Review comment:
       The logic in this test case has become rather obscure after the change. 
Maybe we could do something simpler than comparing segment by segment. As far 
as I can tell, all the test is doing is ensuring that the first uncleanable 
offset is respected. Maybe a simpler test would just write the same key over 
and over and then assert that all records below the uncleanable offset are 
removed and all values above that offset are retained?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -1161,6 +1191,14 @@ private[log] class CleanedTransactionMetadata {
     }
   }
 
+  /**
+   * Apply transactions that accumulated before cleanedIndex was applied
+   */
+  def appendTransactionIndex(): Unit = {
+    toAppend.foreach(transaction => 
cleanedIndex.foreach(_.append(transaction)))
+    toAppend = ListBuffer.empty

Review comment:
       Maybe we can just `clear()`?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -340,17 +343,18 @@ private FilterResult(ByteBuffer outputBuffer) {
         private void updateRetainedBatchMetadata(MutableRecordBatch 
retainedBatch, int numMessagesInBatch, boolean headerOnly) {
             int bytesRetained = headerOnly ? 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD : retainedBatch.sizeInBytes();
             updateRetainedBatchMetadata(retainedBatch.maxTimestamp(), 
retainedBatch.lastOffset(),
-                    retainedBatch.lastOffset(), numMessagesInBatch, 
bytesRetained);
+                    retainedBatch.lastOffset(), retainedBatch.baseOffset(), 
numMessagesInBatch, bytesRetained);
         }
 
         private void updateRetainedBatchMetadata(long maxTimestamp, long 
shallowOffsetOfMaxTimestamp, long maxOffset,
-                                                int messagesRetained, int 
bytesRetained) {
+                                                long baseOffset, int 
messagesRetained, int bytesRetained) {
             validateBatchMetadata(maxTimestamp, shallowOffsetOfMaxTimestamp, 
maxOffset);
             if (maxTimestamp > this.maxTimestamp) {
                 this.maxTimestamp = maxTimestamp;
                 this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
             }
             this.maxOffset = Math.max(maxOffset, this.maxOffset);
+            if (this.bytesRetained == 0) this.baseOffsetOfFirstBatch = 
baseOffset;

Review comment:
       It seems like it would be more direct to check `baseOffsetOfFirstBatch < 
0`.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2659,3 +2672,9 @@ case object LogDeletion extends SegmentDeletionReason {
     log.info(s"Deleting segments as the log has been deleted: 
${toDelete.mkString(",")}")
   }
 }
+
+case object SegmentCompaction extends SegmentDeletionReason {
+  override def logReason(log: Log, toDelete: List[LogSegment]): Unit = {
+    log.info(s"Deleting segments as all records were deleted in log 
compaction: ${toDelete.mkString(",")}")

Review comment:
       nit: "deleted during log compaction"?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -386,6 +390,13 @@ public long maxOffset() {
             return maxOffset;
         }
 
+        /**
+         * @return  the baseOffset of the first batch of retained records or 
-1 if no batches are retained
+         */
+        public long baseOffsetOfFirstBatch() {

Review comment:
       Would it be worth using `OptionalLong`?

##########
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##########
@@ -1267,6 +1266,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(100L, latests.get(t1p0))
   }
 
+  @Test
+  def testBeginningOffsetsOnCompactedTopic(): Unit = {
+    val topic0 = "topicWithoutCompaction"
+    val topic1 = "topicWithCompaction"
+    val t0p0 = new TopicPartition(topic0, 0)
+    val t1p0 = new TopicPartition(topic1, 0)
+    val t1p1 = new TopicPartition(topic1, 1)
+    val partitions = Set(t0p0, t1p0, t1p1).asJava
+
+    val producerProps = new Properties()
+    // Each batch will hold about 10 records
+    producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "256")
+    val producer = createProducer(configOverrides = producerProps)
+    // First topic will not have compaction. Simply a sanity test.
+    createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 1, 
recordsPerPartition = 100)
+

Review comment:
       nit: unneeded newline

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -595,8 +595,8 @@ private[log] object LogCleanerManager extends Logging {
     // may be cleaned
     val firstUncleanableDirtyOffset: Long = Seq(
 
-      // we do not clean beyond the first unstable offset
-      log.firstUnstableOffset,
+      // we do not clean beyond the lastStableOffset

Review comment:
       Maybe worth emphasizing that this also implies that we do not clean 
beyond the high watermark?
   ```scala
   // we do not clean beyond the last stable offset (and therefore the high 
watermark)
   ```

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -700,11 +716,18 @@ private[log] class Cleaner(val id: Int,
       // if any messages are to be retained, write them out
       val outputBuffer = result.outputBuffer
       if (outputBuffer.position() > 0) {
+        if (destSegment.isEmpty) {
+          // create a new segment with a suffix appended to the name of the 
log and indexes
+          destSegment = Some(LogCleaner.createNewCleanedSegment(log.dir, 
log.config, result.baseOffsetOfFirstBatch()))

Review comment:
       nit: unnecessary parenthesis `baseOffsetOfFirstBatch()`

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -540,13 +540,34 @@ class LogCleanerManagerTest extends Logging {
 
     while(log.numberOfSegments < 8)
       log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
+    log.updateHighWatermark(log.activeSegment.baseOffset)
 
     val lastCleanOffset = Some(0L)
     val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
lastCleanOffset, time.milliseconds)
     assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable 
offset starts at the beginning of the log.")
     assertEquals(log.activeSegment.baseOffset, 
cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset 
begins with the active segment.")
   }
 
+  @Test
+  def testCleanableOffsetsForNoneWithLowerHighWatermark(): Unit = {

Review comment:
       nit: the name is not very clear. What does `None` refer to?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -599,20 +607,27 @@ private[log] class Cleaner(val id: Int,
         currentSegmentOpt = nextSegmentOpt
       }
 
-      cleaned.onBecomeInactiveSegment()
-      // flush new segment to disk before swap
-      cleaned.flush()
+      cleanedSegment match {
+        case Some(cleaned) =>
+          // Result of cleaning included at least one record.
+          cleaned.onBecomeInactiveSegment()
+          // flush new segment to disk before swap
+          cleaned.flush()
 
-      // update the modification date to retain the last modified date of the 
original files
-      val modified = segments.last.lastModified
-      cleaned.lastModified = modified
+          // update the modification date to retain the last modified date of 
the original files
+          val modified = segments.last.lastModified
+          cleaned.lastModified = modified
 
-      // swap in new segment
-      info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in 
log $log")
-      log.replaceSegments(List(cleaned), segments)
+          // swap in new segment
+          info(s"Swapping in cleaned segment $cleaned for segment(s) $segments 
in log $log")
+          log.replaceSegments(List(cleaned), segments)
+        case None =>
+          info(s"Deleting segment(s) $segments in log $log")

Review comment:
       nit: maybe add some explanation? Maybe `Deleting segments $segments in 
log $log since no records were retained after cleaning`?

##########
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##########
@@ -1267,6 +1266,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(100L, latests.get(t1p0))
   }
 
+  @Test
+  def testBeginningOffsetsOnCompactedTopic(): Unit = {
+    val topic0 = "topicWithoutCompaction"
+    val topic1 = "topicWithCompaction"
+    val t0p0 = new TopicPartition(topic0, 0)
+    val t1p0 = new TopicPartition(topic1, 0)
+    val t1p1 = new TopicPartition(topic1, 1)
+    val partitions = Set(t0p0, t1p0, t1p1).asJava
+
+    val producerProps = new Properties()
+    // Each batch will hold about 10 records
+    producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "256")
+    val producer = createProducer(configOverrides = producerProps)
+    // First topic will not have compaction. Simply a sanity test.
+    createTopicAndSendRecords(producer, topicName = topic0, numPartitions = 1, 
recordsPerPartition = 100)
+
+    
+    // Second topic will have compaction. 
+    // The first partition will have compaction occur at offset 0 so 
beginningOffsets should be nonzero.
+    // The second partition will not have compaction occur at offset 0, so 
beginningOffsets will remain 0.
+    val props = new Properties()
+    props.setProperty(LogConfig.MaxCompactionLagMsProp, "1")
+    props.setProperty(LogConfig.CleanupPolicyProp, "compact")
+    props.setProperty(LogConfig.MinCleanableDirtyRatioProp, "0.01")
+    props.setProperty(LogConfig.SegmentBytesProp, "512")
+
+    // Send records to first partition -- all duplicates.
+    createTopic(topic1, numPartitions = 2, replicationFactor = 1, props)
+    TestUtils.sendRecordsWithKey(producer, 100, 0L, new TopicPartition(topic1, 
0), "key")
+
+    // Send records fo second partition -- only last 50 records are duplicates.
+    sendRecords(producer, 50, t1p1)
+    TestUtils.sendRecordsWithKey(producer, 50, 50L, new TopicPartition(topic1, 
1), "key")
+
+    // Sleep to allow compaction to take place.
+    Thread.sleep(25000)

Review comment:
       Ouch. 25s of sleep is significant. I wonder if this test is overkill 
given testing we have in `LogCleanerTest`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to