Zhanxiang (Patrick) Huang created KAFKA-7452:
------------------------------------------------

             Summary: Deleting snapshot files after check-pointing log recovery 
offsets can slow down replication when truncation happens
                 Key: KAFKA-7452
                 URL: https://issues.apache.org/jira/browse/KAFKA-7452
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.0.0, 1.1.1, 1.1.0, 1.0.1, 1.0.0
            Reporter: Zhanxiang (Patrick) Huang


After KAFKA-5829, Kafka will try to iterate through all the partition dirs to 
delete useless snapshot files in "checkpointLogRecoveryOffsetsInDir". 
Currently, "checkpointLogRecoveryOffsetsInDir" is used in the following places:
 # Truncation
 # Log dir deletion and movement
 # Background thread checkpointing recovery offsets

In 2.0 deployment on a cluster with 10k partitions per broker, we found out 
that deleting useless snapshot files in the critical path of log truncation can 
significantly slow down followers to catch up with leader during rolling bounce 
(~2x slower than 0.11). The reason is that we basically do a "ls -R" for the 
whole data directory only to potentially delete the snapshot files in one 
partition directory because the way we identify snapshot files is to list the 
directories and check the filename suffix.

In our case, "listSnapshotFiles" takes ~1ms per partition directory so it takes 
~1ms * 10k = ~10s to just delete snapshot files in one partition after the 
truncation, which delays future fetches in the fetcher thread.

Here are the related code snippets:

 LogManager.scala

 
{code:java}
private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
  for {
    partitionToLog <- logsByDir.get(dir.getAbsolutePath)
    checkpoint <- recoveryPointCheckpoints.get(dir)
  } {
    try {
      checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
      allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
    } catch {
      case e: IOException =>
        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk 
error while writing to recovery point " +
          s"file in directory $dir", e)
    }
  }
}
{code}
 

 ProducerStateChangeManager.scala

 
{code:java}
private[log] def listSnapshotFiles(dir: File): Seq[File] = {
  if (dir.exists && dir.isDirectory) {
    Option(dir.listFiles).map { files =>
      files.filter(f => f.isFile && isSnapshotFile(f)).toSeq
    }.getOrElse(Seq.empty)
  } else Seq.empty
}


private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => 
true) {
  listSnapshotFiles(dir).filter(file => 
predicate(offsetFromFile(file))).foreach { file =>
    Files.deleteIfExists(file.toPath)
  }
}

{code}
 

There are a few things that can be optimized here:
 # We can have an in-memory cache for the snapshot files metadata (filename) in 
ProducerStateManager to avoid calling dir.listFiles in "deleteSnapshotFiles", 
"latestSnapshotFile" and "oldestSnapshotFile".
 # After truncation, we can only try to delete snapshot files for the truncated 
partitions (in replica fetcher thread, we truncate one partition at a time) 
instead of all partitions. Or maybe we don't even need to delete snapshot files 
in the critical path of truncation because the background 
log-recovery-offset-checkpoint-thread will do it periodically. This can also 
apply on log deletion/movement.
 # If we want to further optimize the actual snapshot file deletion, we can 
make it async. But I am not sure whether it is needed after we have 1) and 2).

Also, we notice that there is no way to disable transaction/exactly-once 
support in the broker-side given that it will bring in some extra overhead even 
though we have no clients using this feature. Not sure whether this is a common 
use case, but it is useful if we can have a switch to avoid the extra 
performance overhead.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to