Updated Branches: refs/heads/flume-1.4 a7b9b7eb3 -> ee2f6c0ba
FLUME-1917: FileChannel group commit (coalesce fsync) (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ee2f6c0b Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ee2f6c0b Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ee2f6c0b Branch: refs/heads/flume-1.4 Commit: ee2f6c0ba43055c236497c79525428376b70bade Parents: a7b9b7e Author: Brock Noland <[email protected]> Authored: Sun Jun 23 14:50:51 2013 -0500 Committer: Brock Noland <[email protected]> Committed: Sun Jun 23 14:51:06 2013 -0500 ---------------------------------------------------------------------- .../java/org/apache/flume/channel/file/Log.java | 11 ++++- .../org/apache/flume/channel/file/LogFile.java | 52 +++++++++++++++++--- .../apache/flume/channel/file/TestLogFile.java | 51 +++++++++++++++++++ 3 files changed, 106 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/ee2f6c0b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 8dc0ff8..8a8cb7f 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -869,14 +869,21 @@ public class Log { boolean error = true; try { try { - logFiles.get(logFileIndex).commit(buffer); + LogFile.Writer logFileWriter = logFiles.get(logFileIndex); + // If multiple transactions are committing at the same time, + // this ensures that the number of actual fsyncs is small and a + // number of them are grouped together into one. + logFileWriter.commit(buffer); + logFileWriter.sync(); error = false; } catch (LogFileRetryableIOException e) { if(!open) { throw e; } roll(logFileIndex, buffer); - logFiles.get(logFileIndex).commit(buffer); + LogFile.Writer logFileWriter = logFiles.get(logFileIndex); + logFileWriter.commit(buffer); + logFileWriter.sync(); error = false; } } finally { http://git-wip-us.apache.org/repos/asf/flume/blob/ee2f6c0b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java index bb8ce1a..62f68c6 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java @@ -167,6 +167,11 @@ public abstract class LogFile { private final CipherProvider.Encryptor encryptor; private final CachedFSUsableSpace usableSpace; private volatile boolean open; + private long lastCommitPosition; + private long lastSyncPosition; + + // To ensure we can count the number of fsyncs. + private long syncCount; Writer(File file, int logFileID, long maxFileSize, @@ -207,9 +212,28 @@ public abstract class LogFile { long getMaxSize() { return maxFileSize; } + + @VisibleForTesting + long getLastCommitPosition(){ + return lastCommitPosition; + } + + @VisibleForTesting + long getLastSyncPosition() { + return lastSyncPosition; + } + + @VisibleForTesting + long getSyncCount() { + return syncCount; + } synchronized long position() throws IOException { return getFileChannel().position(); } + + // encrypt and write methods may not be thread safe in the following + // methods, so all methods need to be synchronized. + synchronized FlumeEventPointer put(ByteBuffer buffer) throws IOException { if(encryptor != null) { buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array())); @@ -229,14 +253,17 @@ public abstract class LogFile { } write(buffer); } + synchronized void commit(ByteBuffer buffer) throws IOException { - if(encryptor != null) { + if (encryptor != null) { buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array())); } write(buffer); - sync(); + lastCommitPosition = position(); } - private Pair<Integer, Integer> write(ByteBuffer buffer) throws IOException { + + private Pair<Integer, Integer> write(ByteBuffer buffer) + throws IOException { if(!isOpen()) { throw new LogFileRetryableIOException("File closed " + file); } @@ -260,14 +287,27 @@ public abstract class LogFile { Preconditions.checkState(wrote == toWrite.limit()); return Pair.of(getLogFileID(), offset); } + synchronized boolean isRollRequired(ByteBuffer buffer) throws IOException { return isOpen() && position() + (long) buffer.limit() > getMaxSize(); } - private void sync() throws IOException { - if(!isOpen()) { + + /** + * Sync the underlying log file to disk. Expensive call, + * should be used only on commits. If a sync has already happened after + * the last commit, this method is a no-op + * @throws IOException + * @throws LogFileRetryableIOException - if this log file is closed. + */ + synchronized void sync() throws IOException { + if (!isOpen()) { throw new LogFileRetryableIOException("File closed " + file); } - getFileChannel().force(false); + if (lastSyncPosition < lastCommitPosition) { + getFileChannel().force(false); + lastSyncPosition = position(); + syncCount++; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/ee2f6c0b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java index 4da6ac1..e5d830e 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java @@ -29,10 +29,14 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.FileUtils; import org.apache.flume.channel.file.proto.ProtosFactory; @@ -285,6 +289,7 @@ public class TestLogFile { FlumeEventPointer ptr = logFileWriter.put(bytes); logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit (transactionID, WriteOrderOracle.next()))); + logFileWriter.sync(); final int offset = ptr.getOffset(); RandomAccessFile writer = new RandomAccessFile(dataFile, "rw"); writer.seek(offset + 1500); @@ -309,6 +314,7 @@ public class TestLogFile { FlumeEventPointer ptr = logFileWriter.put(bytes); logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit (transactionID, WriteOrderOracle.next()))); + logFileWriter.sync(); final int offset = ptr.getOffset(); LogFile.OperationRecordUpdater updater = new LogFile .OperationRecordUpdater(dataFile); @@ -354,6 +360,7 @@ public class TestLogFile { FlumeEventPointer ptr = logFileWriter.put(bytes); logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit (transactionID, WriteOrderOracle.next()))); + logFileWriter.sync(); final int offset = ptr.getOffset(); LogFile.OperationRecordUpdater updater = new LogFile .OperationRecordUpdater(dataFile); @@ -361,4 +368,48 @@ public class TestLogFile { RandomAccessFile fileReader = new RandomAccessFile(dataFile, "rw"); Assert.assertEquals(LogFile.OP_NOOP, fileReader.readByte()); } + + @Test + public void testGroupCommit() throws Exception { + final FlumeEvent eventIn = TestUtils.newPersistableEvent(250); + final CyclicBarrier barrier = new CyclicBarrier(20); + ExecutorService executorService = Executors.newFixedThreadPool(20); + ExecutorCompletionService<Void> completionService = new + ExecutorCompletionService<Void>(executorService); + final LogFile.Writer writer = logFileWriter; + final AtomicLong txnId = new AtomicLong(++transactionID); + for (int i = 0; i < 20; i++) { + completionService.submit(new Callable<Void>() { + @Override + public Void call() { + try { + Put put = new Put(txnId.incrementAndGet(), + WriteOrderOracle.next(), eventIn); + ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); + writer.put(bytes); + writer.commit(TransactionEventRecord.toByteBuffer( + new Commit(txnId.get(), WriteOrderOracle.next()))); + barrier.await(); + writer.sync(); + } catch (Exception ex) { + Throwables.propagate(ex); + } + return null; + } + }); + } + + for(int i = 0; i < 20; i++) { + completionService.take().get(); + } + + //At least 250*20, but can be higher due to serialization overhead + Assert.assertTrue(logFileWriter.position() >= 5000); + Assert.assertEquals(1, writer.getSyncCount()); + Assert.assertTrue(logFileWriter.getLastCommitPosition() == + logFileWriter.getLastSyncPosition()); + + executorService.shutdown(); + + } }
