[ https://issues.apache.org/jira/browse/KAFKA-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhanxiang (Patrick) Huang resolved KAFKA-7452. ---------------------------------------------- Resolution: Duplicate KAFKA-7557 fixed this. > Deleting snapshot files after check-pointing log recovery offsets can slow > down replication when truncation happens > ------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-7452 > URL: https://issues.apache.org/jira/browse/KAFKA-7452 > Project: Kafka > Issue Type: Bug > Affects Versions: 1.0.0, 1.0.1, 1.1.0, 1.1.1, 2.0.0 > Reporter: Zhanxiang (Patrick) Huang > Assignee: Zhanxiang (Patrick) Huang > Priority: Major > > After KAFKA-5829, Kafka will try to iterate through all the partition dirs to > delete useless snapshot files in "checkpointLogRecoveryOffsetsInDir". > Currently, "checkpointLogRecoveryOffsetsInDir" is used in the following > places: > # Truncation > # Log dir deletion and movement > # Background thread checkpointing recovery offsets > In 2.0 deployment on a cluster with 10k partitions per broker, we found out > that deleting useless snapshot files in the critical path of log truncation > can significantly slow down followers to catch up with leader during rolling > bounce (~2x slower than 0.11). The reason is that we basically do a "ls -R" > for the whole data directory only to potentially delete the snapshot files in > one partition directory because the way we identify snapshot files is to list > the directories and check the filename suffix. > In our case, "listSnapshotFiles" takes ~1ms per partition directory so it > takes ~1ms * 10k = ~10s to just delete snapshot files in one partition after > the truncation, which delays future fetches in the fetcher thread. > Here are the related code snippets: > LogManager.scala > > {code:java} > private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = { > for { > partitionToLog <- logsByDir.get(dir.getAbsolutePath) > checkpoint <- recoveryPointCheckpoints.get(dir) > } { > try { > checkpoint.write(partitionToLog.mapValues(_.recoveryPoint)) > allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint()) > } catch { > case e: IOException => > logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, > s"Disk error while writing to recovery point " + > s"file in directory $dir", e) > } > } > } > {code} > > ProducerStateChangeManager.scala > > {code:java} > private[log] def listSnapshotFiles(dir: File): Seq[File] = { > if (dir.exists && dir.isDirectory) { > Option(dir.listFiles).map { files => > files.filter(f => f.isFile && isSnapshotFile(f)).toSeq > }.getOrElse(Seq.empty) > } else Seq.empty > } > private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => > true) { > listSnapshotFiles(dir).filter(file => > predicate(offsetFromFile(file))).foreach { file => > Files.deleteIfExists(file.toPath) > } > } > {code} > > There are a few things that can be optimized here: > # We can have an in-memory cache for the snapshot files metadata (filename) > in ProducerStateManager to avoid calling dir.listFiles in > "deleteSnapshotFiles", "latestSnapshotFile" and "oldestSnapshotFile". > # After truncation, we can only try to delete snapshot files for the > truncated partitions (in replica fetcher thread, we truncate one partition at > a time) instead of all partitions. Or maybe we don't even need to delete > snapshot files in the critical path of truncation because the background > log-recovery-offset-checkpoint-thread will do it periodically. This can also > apply on log deletion/movement. > # If we want to further optimize the actual snapshot file deletion, we can > make it async. But I am not sure whether it is needed after we have 1) and 2). > Also, we notice that there is no way to disable transaction/exactly-once > support in the broker-side given that it will bring in some extra overhead > even though we have no clients using this feature. Not sure whether this is a > common use case, but it is useful if we can have a switch to avoid the extra > performance overhead. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)