Updated Branches: refs/heads/flume-1.4 2419550b3 -> 6cbe4a58e
FLUME-1891: Fast replay runs even when checkpoint exists (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6cbe4a58 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6cbe4a58 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6cbe4a58 Branch: refs/heads/flume-1.4 Commit: 6cbe4a58e243d8d9cf241ba10f958b7eaf52c3dd Parents: 2419550 Author: Brock Noland <[email protected]> Authored: Fri Feb 1 14:17:27 2013 -0600 Committer: Brock Noland <[email protected]> Committed: Fri Feb 1 14:17:40 2013 -0600 ---------------------------------------------------------------------- .../org/apache/flume/channel/file/FileChannel.java | 6 ++ .../java/org/apache/flume/channel/file/Log.java | 24 +++++-- .../flume/channel/file/TestFileChannelRestart.java | 53 +++++++++++++++ 3 files changed, 77 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/6cbe4a58/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index d921387..d98209b 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import org.apache.flume.Channel; import org.apache.flume.ChannelException; import org.apache.flume.Context; @@ -385,6 +386,11 @@ public class FileChannel extends BasicChannelSemantics { } } + @VisibleForTesting + boolean didFastReplay() { + return log.didFastReplay(); + } + public boolean isOpen() { return open; } http://git-wip-us.apache.org/repos/asf/flume/blob/6cbe4a58/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 8a4201c..7da8c49 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -44,6 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import javax.annotation.Nullable; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.flume.ChannelException; import org.apache.flume.Event; @@ -113,6 +114,7 @@ class Log { private String encryptionKeyAlias; private Key encryptionKey; private final long usableSpaceRefreshInterval; + private boolean didFastReplay = false; static class Builder { private long bCheckpointInterval; @@ -340,17 +342,17 @@ class Log { */ LogUtils.sort(dataFiles); - boolean useFastReplay = this.useFastReplay; + boolean shouldFastReplay = this.useFastReplay; /* * Read the checkpoint (in memory queue) from one of two alternating * locations. We will read the last one written to disk. */ File checkpointFile = new File(checkpointDir, "checkpoint"); - if(useFastReplay) { + if(shouldFastReplay) { if(checkpointFile.exists()) { LOGGER.debug("Disabling fast full replay because checkpoint " + "exists: " + checkpointFile); - useFastReplay = false; + shouldFastReplay = false; } else { LOGGER.debug("Not disabling fast full replay because checkpoint " + " does not exist: " + checkpointFile); @@ -379,7 +381,7 @@ class Log { * but the inflights were not. If the checkpoint was bad, the backing * store factory would have thrown. */ - doReplay(queue, dataFiles, encryptionKeyProvider); + doReplay(queue, dataFiles, encryptionKeyProvider, shouldFastReplay); } catch (BadCheckpointException ex) { LOGGER.warn("Checkpoint may not have completed successfully. " + "Forcing full replay, this may take a while.", ex); @@ -391,7 +393,10 @@ class Log { queueCapacity, channelNameDescriptor); queue = new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile); - doReplay(queue, dataFiles, encryptionKeyProvider); + // If the checkpoint was deleted due to BadCheckpointException, then + // trigger fast replay if the channel is configured to. + shouldFastReplay = this.useFastReplay; + doReplay(queue, dataFiles, encryptionKeyProvider, shouldFastReplay); } @@ -419,10 +424,12 @@ class Log { @SuppressWarnings("deprecation") private void doReplay(FlumeEventQueue queue, List<File> dataFiles, - KeyProvider encryptionKeyProvider) throws Exception { + KeyProvider encryptionKeyProvider, + boolean useFastReplay) throws Exception { CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles, queue); if (useFastReplay && rebuilder.rebuild()) { + didFastReplay = true; LOGGER.info("Fast replay successful."); } else { ReplayHandler replayHandler = new ReplayHandler(queue, @@ -437,6 +444,11 @@ class Log { } } + @VisibleForTesting + boolean didFastReplay() { + return didFastReplay; + } + int getNextFileID() { Preconditions.checkState(open, "Log is closed"); return nextFileID.get(); http://git-wip-us.apache.org/repos/asf/flume/blob/6cbe4a58/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java index ea57cdb..170dc72 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java @@ -299,6 +299,59 @@ public class TestFileChannelRestart extends TestFileChannelBase { testCorruptInflights("inflightTakes"); } + @Test + public void testFastReplayWithCheckpoint() throws Exception{ + testFastReplay(false, true); + } + + @Test + public void testFastReplayWithBadCheckpoint() throws Exception{ + testFastReplay(true, true); + } + + @Test + public void testNoFastReplayWithCheckpoint() throws Exception{ + testFastReplay(false, false); + } + + @Test + public void testNoFastReplayWithBadCheckpoint() throws Exception{ + testFastReplay(true, false); + } + + private void testFastReplay(boolean shouldCorruptCheckpoint, + boolean useFastReplay) throws Exception{ + Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_FAST_REPLAY, + String.valueOf(useFastReplay)); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set<String> in = putEvents(channel, "restart", 10, 100); + Assert.assertEquals(100, in.size()); + forceCheckpoint(channel); + channel.stop(); + if (shouldCorruptCheckpoint) { + File checkpoint = new File(checkpointDir, "checkpoint"); + RandomAccessFile writer = new RandomAccessFile( + Serialization.getMetaDataFile(checkpoint), "rw"); + writer.seek(10); + writer.writeLong(new Random().nextLong()); + writer.getFD().sync(); + writer.close(); + } + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set<String> out = consumeChannel(channel); + if (useFastReplay && shouldCorruptCheckpoint) { + Assert.assertTrue(channel.didFastReplay()); + } else { + Assert.assertFalse(channel.didFastReplay()); + } + compareInputAndOut(in, out); + } + private void testCorruptInflights(String name) throws Exception { Map<String, String> overrides = Maps.newHashMap(); channel = createFileChannel(overrides);
