hachikuji commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r609139321



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -760,6 +768,12 @@ class Log(@volatile private var _dir: File,
       // must fall within the range of existing segment(s). If we cannot find 
such a segment, it means the deletion
       // of that segment was successful. In such an event, we should simply 
rename the .swap to .log without having to
       // do a replace with an existing segment.
+      //
+      // For case 1 (log cleaning), we may have old segments before or after 
the swap segment that were cleaned.
+      // Unfortunately, since the baseOffset and the readNextOffset were 
changed, these segments will not be removed on
+      // recovery if they were not yet given a DeletedFileSuffix. A subsequent 
cleaning that succeeds will correctly remove these segments.
+      // ie. segments [0, 1000), [1000, 2000), [2000, 3000) cleaned into 
[1500, 1750).swap without marking old segments with DeletedFileSuffix

Review comment:
       Maybe this would be a more interesting example:
   ```
   // ie. segments [0, 1000), [1000, 2000), [2000, 3000), [3000, 4000) cleaned 
into [1500, 2500).swap without marking old segments with DeletedFileSuffix
   ```
   The filter logic below would capture both [1000, 2000) and [2000, 3000) as 
the old segments which will be replaced with [1500, 2500). Do I have that 
right? So in other words, the recovery logic replaces any overlapping segments.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -760,6 +768,12 @@ class Log(@volatile private var _dir: File,
       // must fall within the range of existing segment(s). If we cannot find 
such a segment, it means the deletion
       // of that segment was successful. In such an event, we should simply 
rename the .swap to .log without having to
       // do a replace with an existing segment.
+      //
+      // For case 1 (log cleaning), we may have old segments before or after 
the swap segment that were cleaned.
+      // Unfortunately, since the baseOffset and the readNextOffset were 
changed, these segments will not be removed on
+      // recovery if they were not yet given a DeletedFileSuffix. A subsequent 
cleaning that succeeds will correctly remove these segments.
+      // ie. segments [0, 1000), [1000, 2000), [2000, 3000) cleaned into 
[1500, 1750).swap without marking old segments with DeletedFileSuffix
+      // -> [0. 1000), [1500, 1750), [2000, 3000)

Review comment:
       nit: `[0, 1000)`

##########
File path: core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
##########
@@ -116,6 +116,18 @@ abstract class AbstractConsumerTest extends 
BaseRequestTest {
     records
   }
 
+  protected def sendDuplicateRecords(producer: KafkaProducer[Array[Byte], 
Array[Byte]], numRecords: Int,

Review comment:
       How about renaming this to `sendRecordsWithKey` and include the key as a 
parameter? Otherwise it is a little strange as a general utility since it is 
tailored so narrowly for compaction. Also, perhaps we could consider moving 
this to `TestUtils`?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -701,11 +719,17 @@ private[log] class Cleaner(val id: Int,
       // if any messages are to be retained, write them out
       val outputBuffer = result.outputBuffer
       if (outputBuffer.position() > 0) {
+        if (destSegment.isEmpty) {
+          // create a new segment with a suffix appended to the name of the 
log and indexes
+          destSegment = Some(LogCleaner.createNewCleanedSegment(log, 
result.minOffset()))
+          transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex)

Review comment:
       Sorry to keep pressing this, but do you have an update here? It would be 
nice to keep the behavior here on par with what is upstream. Really if you 
think about the way this is working, we could write to the cleaned segment 
directly from `filterTo` instead of building a buffer in memory, but we can 
leave that as a follow-up improvement.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -600,20 +608,31 @@ private[log] class Cleaner(val id: Int,
         currentSegmentOpt = nextSegmentOpt
       }
 
-      cleaned.onBecomeInactiveSegment()
-      // flush new segment to disk before swap
-      cleaned.flush()
+      cleanedSegment match {
+        case Some(cleaned) => {

Review comment:
       nit: braces are unnecessary. Similar for `None` case

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -760,6 +768,12 @@ class Log(@volatile private var _dir: File,
       // must fall within the range of existing segment(s). If we cannot find 
such a segment, it means the deletion
       // of that segment was successful. In such an event, we should simply 
rename the .swap to .log without having to
       // do a replace with an existing segment.
+      //
+      // For case 1 (log cleaning), we may have old segments before or after 
the swap segment that were cleaned.

Review comment:
       Can we clarify the comment above that "the swap segment must fall within 
the range of existing segment(s)"? I find it a bit confusing in the context of 
the new behavior.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2393,6 +2407,10 @@ class Log(@volatile private var _dir: File,
       }
       // okay we are safe now, remove the swap suffix
       sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
+
+      // If not recovered swap file we need to increment logStartOffset here. 
Otherwise, we do this when loading the log.
+      if (!isRecoveredSwapFile)

Review comment:
       Can you explain why this is necessary? I understand that there is logic 
to initialize the log start offset after loading segments, but why do we need a 
special check to prevent updating the log start offset here?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -600,20 +608,31 @@ private[log] class Cleaner(val id: Int,
         currentSegmentOpt = nextSegmentOpt
       }
 
-      cleaned.onBecomeInactiveSegment()
-      // flush new segment to disk before swap
-      cleaned.flush()
+      cleanedSegment match {
+        case Some(cleaned) => {
+          // Result of cleaning included at least one record.
+          cleaned.onBecomeInactiveSegment()
+          // flush new segment to disk before swap
+          cleaned.flush()
 
-      // update the modification date to retain the last modified date of the 
original files
-      val modified = segments.last.lastModified
-      cleaned.lastModified = modified
+          // update the modification date to retain the last modified date of 
the original files
+          val modified = segments.last.lastModified
+          cleaned.lastModified = modified
 
-      // swap in new segment
-      info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in 
log $log")
-      log.replaceSegments(List(cleaned), segments)
+          // swap in new segment
+          info(s"Swapping in cleaned segment $cleaned for segment(s) $segments 
in log $log")
+          log.replaceSegments(List(cleaned), segments)
+        }
+        case None => {
+          info(s"Deleting segment(s) $segments in log $log")
+          log.deleteSegments(segments, SegmentCompaction)
+        }
+      }
     } catch {
       case e: LogCleaningAbortedException =>
-        try cleaned.deleteIfExists()
+          try if (cleanedSegment.isDefined) {

Review comment:
       nit: this alignment looks off. Also, can we do a `foreach`?

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -601,6 +601,9 @@ private[log] object LogCleanerManager extends Logging {
       // the active segment is always uncleanable
       Option(log.activeSegment.baseOffset),
 
+      // we do not want to clean past the high watermark
+      Option(log.highWatermark),

Review comment:
       I agree this probably makes sense. It is surprising that it was not 
there already. Was there a specific reason related to this patch that you 
decided to do it here? An alternative by the way is to replace the high 
watermark and first unstable offset here with `lastStableOffset`, which would 
represent the lower bound of the two.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -683,6 +702,8 @@ private[log] class Cleaner(val id: Int,
     }
 
     var position = 0
+    var destSegment = dest
+    val topicPartition = log.topicPartition

Review comment:
       nit: does not feel like a very useful redefinition.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2393,6 +2407,10 @@ class Log(@volatile private var _dir: File,
       }
       // okay we are safe now, remove the swap suffix
       sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
+
+      // If not recovered swap file we need to increment logStartOffset here. 
Otherwise, we do this when loading the log.
+      if (!isRecoveredSwapFile)
+         maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset, 
LogCompaction)

Review comment:
       Maybe we need a separate reason for the segment splitting case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to