Repository: kafka Updated Branches: refs/heads/trunk 60a5a523b -> 0c32bc992
KAFKA-3105: Use `Utils.atomicMoveWithFallback` instead of `File.rename` It behaves better on Windows and provides more useful error messages. Also: * Minor inconsistency fix in `kafka.server.OffsetCheckpoint`. * Remove delete from `streams.state.OffsetCheckpoint` constructor (similar to the change in `kafka.server.OffsetCheckpoint` in https://github.com/apache/kafka/commit/836cb1963330a9e342379899e0fe52b72347736e#diff-2503b32f29cbbd61ed8316f127829455L29). Author: Ismael Juma <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #771 from ijuma/kafka-3105-use-atomic-move-with-fallback-instead-of-rename Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0c32bc99 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0c32bc99 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0c32bc99 Branch: refs/heads/trunk Commit: 0c32bc99265c4645fad2e3244dc2c697bfd9a229 Parents: 60a5a52 Author: Ismael Juma <[email protected]> Authored: Mon Jan 18 09:47:32 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon Jan 18 09:47:32 2016 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/log/FileMessageSet.scala | 10 +++++----- core/src/main/scala/kafka/log/LogSegment.scala | 20 +++++++++++++------- core/src/main/scala/kafka/log/OffsetIndex.scala | 11 ++++++----- .../kafka/server/BrokerMetadataCheckpoint.scala | 10 ++-------- .../scala/kafka/server/OffsetCheckpoint.scala | 4 ++-- .../test/scala/unit/kafka/log/CleanerTest.scala | 7 ++++--- .../kafka/streams/state/OffsetCheckpoint.java | 16 +++------------- 7 files changed, 35 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0c32bc99/core/src/main/scala/kafka/log/FileMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index b239a6c..d4ce498 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.network.TransportLayer +import org.apache.kafka.common.utils.Utils /** * An on-disk message set. An optional start and end position can be applied to the message set @@ -291,12 +292,11 @@ class FileMessageSet private[kafka](@volatile var file: File, /** * Rename the file that backs this message set - * @return true iff the rename was successful + * @throws IOException if rename fails. */ - def renameTo(f: File): Boolean = { - val success = this.file.renameTo(f) - this.file = f - success + def renameTo(f: File) { + try Utils.atomicMoveWithFallback(file.toPath, f.toPath) + finally this.file = f } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0c32bc99/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index d604d6c..aa37d52 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -23,7 +23,7 @@ import kafka.server.{LogOffsetMetadata, FetchDataInfo} import org.apache.kafka.common.errors.CorruptRecordException import scala.math._ -import java.io.File +import java.io.{IOException, File} /** @@ -256,12 +256,18 @@ class LogSegment(val log: FileMessageSet, * Change the suffix for the index and log file for this log segment */ def changeFileSuffixes(oldSuffix: String, newSuffix: String) { - val logRenamed = log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) - if(!logRenamed) - throw new KafkaStorageException("Failed to change the log file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset)) - val indexRenamed = index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix))) - if(!indexRenamed) - throw new KafkaStorageException("Failed to change the index file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset)) + + def kafkaStorageException(fileType: String, e: IOException) = + new KafkaStorageException(s"Failed to change the $fileType file suffix from $oldSuffix to $newSuffix for log segment $baseOffset", e) + + try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) + catch { + case e: IOException => throw kafkaStorageException("log", e) + } + try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix))) + catch { + case e: IOException => throw kafkaStorageException("index", e) + } } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/0c32bc99/core/src/main/scala/kafka/log/OffsetIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 84d18bd..e95c9d1 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -17,6 +17,8 @@ package kafka.log +import org.apache.kafka.common.utils.Utils + import scala.math._ import java.io._ import java.nio._ @@ -338,12 +340,11 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi /** * Rename the file that backs this offset index - * @return true iff the rename was successful + * @throws IOException if rename fails */ - def renameTo(f: File): Boolean = { - val success = this.file.renameTo(f) - this.file = f - success + def renameTo(f: File) { + try Utils.atomicMoveWithFallback(file.toPath, f.toPath) + finally this.file = f } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/0c32bc99/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala index 6e8d68d..00e5d0c 100755 --- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala +++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala @@ -43,16 +43,10 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging { fileOutputStream.flush() fileOutputStream.getFD().sync() fileOutputStream.close() - // swap new BrokerMetadata file with previous one - if(!temp.renameTo(file)) { - // renameTo() fails on windows if destination file exists. - file.delete() - if(!temp.renameTo(file)) - throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath(), file.getAbsolutePath())) - } + Utils.atomicMoveWithFallback(temp.toPath, file.toPath) } catch { case ie: IOException => - error("Failed to write meta.properties due to ",ie) + error("Failed to write meta.properties due to", ie) throw ie } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0c32bc99/core/src/main/scala/kafka/server/OffsetCheckpoint.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala index fe1d823..77f283c 100644 --- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala +++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala @@ -71,7 +71,7 @@ class OffsetCheckpoint(val file: File) extends Logging { def read(): Map[TopicAndPartition, Long] = { def malformedLineException(line: String) = - throw new IOException(s"Malformed line in offset checkpoint file: $line'") + new IOException(s"Malformed line in offset checkpoint file: $line'") lock synchronized { val reader = new BufferedReader(new FileReader(file)) @@ -104,7 +104,7 @@ class OffsetCheckpoint(val file: File) extends Logging { throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version) } } catch { - case e: NumberFormatException => malformedLineException(line) + case e: NumberFormatException => throw malformedLineException(line) } finally { reader.close() } http://git-wip-us.apache.org/repos/asf/kafka/blob/0c32bc99/core/src/test/scala/unit/kafka/log/CleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 8ab9f91..a8092de 100755 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -19,6 +19,7 @@ package kafka.log import java.io.File import java.nio._ +import java.nio.file.Paths import java.util.Properties import java.util.concurrent.atomic.AtomicLong @@ -376,7 +377,7 @@ class CleanerTest extends JUnitSuite { // On recovery, clean operation is aborted. All messages should be present in the log log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix) for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) { - file.renameTo(new File(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) + Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) } log = recoverAndCheck(config, allKeys) @@ -388,7 +389,7 @@ class CleanerTest extends JUnitSuite { // renamed to .deleted. Clean operation is resumed during recovery. log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix) for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) { - file.renameTo(new File(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) + Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) } log = recoverAndCheck(config, cleanedKeys) @@ -478,4 +479,4 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap { def size: Int = map.size -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0c32bc99/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java index e04de68..d748aac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; import java.io.BufferedReader; import java.io.BufferedWriter; @@ -55,7 +56,6 @@ public class OffsetCheckpoint { private final Object lock; public OffsetCheckpoint(File file) throws IOException { - new File(file + ".tmp").delete(); // try to delete any existing temp files for cleanliness this.file = file; this.lock = new Object(); } @@ -71,26 +71,16 @@ public class OffsetCheckpoint { writeIntLine(writer, VERSION); writeIntLine(writer, offsets.size()); - // write the entries for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) writeEntry(writer, entry.getKey(), entry.getValue()); - // flush the buffer and then fsync the underlying file writer.flush(); fileOutputStream.getFD().sync(); } finally { writer.close(); } - // swap new offset checkpoint file with previous one - if (!temp.renameTo(file)) { - // renameTo() fails on Windows if the destination file exists. - file.delete(); - if (!temp.renameTo(file)) - throw new IOException(String.format("File rename from %s to %s failed.", - temp.getAbsolutePath(), - file.getAbsolutePath())); - } + Utils.atomicMoveWithFallback(temp.toPath(), file.toPath()); } } @@ -122,7 +112,7 @@ public class OffsetCheckpoint { switch (version) { case 0: int expectedSize = readInt(reader); - Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(); + Map<TopicPartition, Long> offsets = new HashMap<>(); String line = reader.readLine(); while (line != null) { String[] pieces = line.split("\\s+");
