Updated Branches: refs/heads/flume-1.3.0 3f49bef22 -> a221a8e99
FLUME-1600. FileChannel metadata files should be written to a temp file and then moved over existing files. (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a221a8e9 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a221a8e9 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a221a8e9 Branch: refs/heads/flume-1.3.0 Commit: a221a8e99155e06c3740d91c9e54d4f150044830 Parents: 3f49bef Author: Hari Shreedharan <[email protected]> Authored: Mon Sep 24 16:09:28 2012 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Mon Sep 24 16:17:15 2012 -0700 ---------------------------------------------------------------------- .../org/apache/flume/channel/file/LogFileV3.java | 58 +++++++++------ .../org/apache/flume/channel/file/TestLogFile.java | 22 ++++++ 2 files changed, 58 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/a221a8e9/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java index 5de6e82..32ebac7 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; +import com.google.protobuf.GeneratedMessage; /** * Represents a single data file on disk. Has methods to write, @@ -83,17 +84,7 @@ class LogFileV3 extends LogFile { logFileMetaData = metaDataBuilder.build(); LOGGER.info("Updating " + metaDataFile.getName() + " currentPosition = " + currentPosition + ", logWriteOrderID = " + logWriteOrderID); - FileOutputStream outputStream = new FileOutputStream(metaDataFile); - try { - logFileMetaData.writeDelimitedTo(outputStream); - outputStream.getChannel().force(true); - } finally { - try { - outputStream.close(); - } catch(IOException e) { - LOGGER.warn("Unable to close " + metaDataFile, e); - } - } + writeDelimitedTo(logFileMetaData, metaDataFile); } } @@ -128,6 +119,39 @@ class LogFileV3 extends LogFile { } } + /** + * Writes a GeneratedMessage to a temp file, synchronizes it to disk + * and then renames the file over file. + * @param msg GeneratedMessage to write to the file + * @param file destination file + * @throws IOException if a write error occurs or the File.renameTo + * method returns false meaning the file could not be overwritten. + */ + public static void writeDelimitedTo(GeneratedMessage msg, File file) + throws IOException { + File tmp = new File(file.getParentFile(), file.getName() + ".tmp"); + FileOutputStream outputStream = new FileOutputStream(tmp); + boolean closed = false; + try { + msg.writeDelimitedTo(outputStream); + outputStream.getChannel().force(true); + outputStream.close(); + closed = true; + if(!tmp.renameTo(file)) { + throw new IOException("Unable to move " + tmp + " over " + file); + } + } finally { + if(!closed) { + try { + outputStream.close(); + } catch(IOException e) { + LOGGER.warn("Unable to close " + tmp, e); + } + } + } + + } + static class Writer extends LogFile.Writer { Writer(File file, int logFileID, long maxFileSize, @Nullable Key encryptionKey, @@ -155,17 +179,7 @@ class LogFileV3 extends LogFile { metaDataBuilder.setCheckpointPosition(0L); metaDataBuilder.setCheckpointWriteOrderID(0L); File metaDataFile = Serialization.getMetaDataFile(file); - FileOutputStream outputStream = new FileOutputStream(metaDataFile); - try { - metaDataBuilder.build().writeDelimitedTo(outputStream); - outputStream.getChannel().force(true); - } finally { - try { - outputStream.close(); - } catch(IOException e) { - LOGGER.warn("Unable to close " + metaDataFile, e); - } - } + writeDelimitedTo(metaDataBuilder.build(), metaDataFile); } @Override int getVersion() { http://git-wip-us.apache.org/repos/asf/flume/blob/a221a8e9/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java index 9fc834e..4b69698 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java @@ -19,6 +19,7 @@ package org.apache.flume.channel.file; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -30,6 +31,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.io.FileUtils; +import org.apache.flume.channel.file.proto.ProtosFactory; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -165,4 +167,24 @@ public class TestLogFile { Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody())); } } + @Test + public void testWriteDelimitedTo() throws IOException { + if(dataFile.isFile()) { + Assert.assertTrue(dataFile.delete()); + } + Assert.assertTrue(dataFile.createNewFile()); + ProtosFactory.LogFileMetaData.Builder metaDataBuilder = + ProtosFactory.LogFileMetaData.newBuilder(); + metaDataBuilder.setVersion(1); + metaDataBuilder.setLogFileID(2); + metaDataBuilder.setCheckpointPosition(3); + metaDataBuilder.setCheckpointWriteOrderID(4); + LogFileV3.writeDelimitedTo(metaDataBuilder.build(), dataFile); + ProtosFactory.LogFileMetaData metaData = ProtosFactory.LogFileMetaData. + parseDelimitedFrom(new FileInputStream(dataFile)); + Assert.assertEquals(1, metaData.getVersion()); + Assert.assertEquals(2, metaData.getLogFileID()); + Assert.assertEquals(3, metaData.getCheckpointPosition()); + Assert.assertEquals(4, metaData.getCheckpointWriteOrderID()); + } }
