Repository: kafka Updated Branches: refs/heads/trunk 592678e4d -> 1e4b0841b
KAFKA-1539 Fsync offset checkpoint file after writing. Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1e4b0841 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1e4b0841 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1e4b0841 Branch: refs/heads/trunk Commit: 1e4b0841b37e9e6526d7a7a7c643b1369d9f03c5 Parents: 592678e Author: Jay Kreps <jay.kr...@gmail.com> Authored: Mon Jul 21 10:22:50 2014 -0700 Committer: Jay Kreps <jay.kr...@gmail.com> Committed: Mon Jul 21 10:22:50 2014 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/server/OffsetCheckpoint.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1e4b0841/core/src/main/scala/kafka/server/OffsetCheckpoint.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala index 7af2f43..8c5b054 100644 --- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala +++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala @@ -34,7 +34,8 @@ class OffsetCheckpoint(val file: File) extends Logging { // write to temp file and then swap with the existing file val temp = new File(file.getAbsolutePath + ".tmp") - val writer = new BufferedWriter(new FileWriter(temp)) + val fileOutputStream = new FileOutputStream(temp) + val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream)) try { // write the current version writer.write(0.toString) @@ -50,8 +51,9 @@ class OffsetCheckpoint(val file: File) extends Logging { writer.newLine() } - // flush and overwrite old file + // flush the buffer and then fsync the underlying file writer.flush() + fileOutputStream.getFD().sync() } finally { writer.close() }