Updated Branches:
  refs/heads/flume-1.3.0 3f49bef22 -> a221a8e99

FLUME-1600. FileChannel metadata files should be written to a temp file and 
then moved over existing files.

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a221a8e9
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a221a8e9
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a221a8e9

Branch: refs/heads/flume-1.3.0
Commit: a221a8e99155e06c3740d91c9e54d4f150044830
Parents: 3f49bef
Author: Hari Shreedharan <[email protected]>
Authored: Mon Sep 24 16:09:28 2012 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Mon Sep 24 16:17:15 2012 -0700

----------------------------------------------------------------------
 .../org/apache/flume/channel/file/LogFileV3.java   |   58 +++++++++------
 .../org/apache/flume/channel/file/TestLogFile.java |   22 ++++++
 2 files changed, 58 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/a221a8e9/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
index 5de6e82..32ebac7 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
 
 /**
  * Represents a single data file on disk. Has methods to write,
@@ -83,17 +84,7 @@ class LogFileV3 extends LogFile {
       logFileMetaData = metaDataBuilder.build();
       LOGGER.info("Updating " + metaDataFile.getName()  + " currentPosition = "
           + currentPosition + ", logWriteOrderID = " + logWriteOrderID);
-      FileOutputStream outputStream = new FileOutputStream(metaDataFile);
-      try {
-        logFileMetaData.writeDelimitedTo(outputStream);
-        outputStream.getChannel().force(true);
-      } finally {
-        try {
-          outputStream.close();
-        } catch(IOException e) {
-          LOGGER.warn("Unable to close " + metaDataFile, e);
-        }
-      }
+      writeDelimitedTo(logFileMetaData, metaDataFile);
     }
   }
 
@@ -128,6 +119,39 @@ class LogFileV3 extends LogFile {
     }
   }
 
+  /**
+   * Writes a GeneratedMessage to a temp file, synchronizes it to disk
+   * and then renames the file over file.
+   * @param msg GeneratedMessage to write to the file
+   * @param file destination file
+   * @throws IOException if a write error occurs or the File.renameTo
+   * method returns false meaning the file could not be overwritten.
+   */
+  public static void writeDelimitedTo(GeneratedMessage msg, File file)
+  throws IOException {
+    File tmp = new File(file.getParentFile(), file.getName() + ".tmp");
+    FileOutputStream outputStream = new FileOutputStream(tmp);
+    boolean closed = false;
+    try {
+      msg.writeDelimitedTo(outputStream);
+      outputStream.getChannel().force(true);
+      outputStream.close();
+      closed = true;
+      if(!tmp.renameTo(file)) {
+        throw new IOException("Unable to move " + tmp + " over " + file);
+      }
+    } finally {
+      if(!closed) {
+        try {
+          outputStream.close();
+        } catch(IOException e) {
+          LOGGER.warn("Unable to close " + tmp, e);
+        }
+      }
+    }
+
+  }
+
   static class Writer extends LogFile.Writer {
     Writer(File file, int logFileID, long maxFileSize,
         @Nullable Key encryptionKey,
@@ -155,17 +179,7 @@ class LogFileV3 extends LogFile {
       metaDataBuilder.setCheckpointPosition(0L);
       metaDataBuilder.setCheckpointWriteOrderID(0L);
       File metaDataFile = Serialization.getMetaDataFile(file);
-      FileOutputStream outputStream = new FileOutputStream(metaDataFile);
-      try {
-        metaDataBuilder.build().writeDelimitedTo(outputStream);
-        outputStream.getChannel().force(true);
-      } finally {
-        try {
-          outputStream.close();
-        } catch(IOException e) {
-          LOGGER.warn("Unable to close " + metaDataFile, e);
-        }
-      }
+      writeDelimitedTo(metaDataBuilder.build(), metaDataFile);
     }
     @Override
     int getVersion() {

http://git-wip-us.apache.org/repos/asf/flume/blob/a221a8e9/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
index 9fc834e..4b69698 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
@@ -19,6 +19,7 @@
 package org.apache.flume.channel.file;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -30,6 +31,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.flume.channel.file.proto.ProtosFactory;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -165,4 +167,24 @@ public class TestLogFile {
       Assert.assertTrue(Arrays.equals(eventIn.getBody(), eventOut.getBody()));
     }
   }
+  @Test
+  public void testWriteDelimitedTo() throws IOException {
+    if(dataFile.isFile()) {
+      Assert.assertTrue(dataFile.delete());
+    }
+    Assert.assertTrue(dataFile.createNewFile());
+    ProtosFactory.LogFileMetaData.Builder metaDataBuilder =
+        ProtosFactory.LogFileMetaData.newBuilder();
+    metaDataBuilder.setVersion(1);
+    metaDataBuilder.setLogFileID(2);
+    metaDataBuilder.setCheckpointPosition(3);
+    metaDataBuilder.setCheckpointWriteOrderID(4);
+    LogFileV3.writeDelimitedTo(metaDataBuilder.build(), dataFile);
+    ProtosFactory.LogFileMetaData metaData = ProtosFactory.LogFileMetaData.
+        parseDelimitedFrom(new FileInputStream(dataFile));
+    Assert.assertEquals(1, metaData.getVersion());
+    Assert.assertEquals(2, metaData.getLogFileID());
+    Assert.assertEquals(3, metaData.getCheckpointPosition());
+    Assert.assertEquals(4, metaData.getCheckpointWriteOrderID());
+  }
 }

Reply via email to