Repository: flume Updated Branches: refs/heads/flume-1.6 2004b986b -> 2006a5747
FLUME-2595. Add option to checkpoint on file channel shutdown (Roshan Naik via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2006a574 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2006a574 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2006a574 Branch: refs/heads/flume-1.6 Commit: 2006a57478c484639ab343bbbf18bfe796d56172 Parents: 2004b98 Author: Hari Shreedharan <[email protected]> Authored: Mon Apr 13 15:57:30 2015 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Mon Apr 13 15:58:13 2015 -0700 ---------------------------------------------------------------------- .../apache/flume/channel/file/FileChannel.java | 5 ++ .../channel/file/FileChannelConfiguration.java | 3 +- .../java/org/apache/flume/channel/file/Log.java | 22 ++++++- .../org/apache/flume/channel/file/TestLog.java | 35 +++++++++++- flume-ng-doc/sphinx/FlumeUserGuide.rst | 1 + .../tools/TestFileChannelIntegrityTool.java | 60 +++++++++++++------- 6 files changed, 102 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/2006a574/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index 61c353a..ed2b996 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -100,6 +100,7 @@ public class FileChannel extends BasicChannelSemantics { private boolean compressBackupCheckpoint; private boolean fsyncPerTransaction; private int fsyncInterval; + private boolean checkpointOnClose = true; @Override public synchronized void setName(String name) { @@ -251,6 +252,9 @@ public class FileChannel extends BasicChannelSemantics { fsyncInterval = context.getInteger(FileChannelConfiguration .FSYNC_INTERVAL, FileChannelConfiguration.DEFAULT_FSYNC_INTERVAL); + checkpointOnClose = context.getBoolean(FileChannelConfiguration + .CHKPT_ONCLOSE, FileChannelConfiguration.DEFAULT_CHKPT_ONCLOSE); + if(queueRemaining == null) { queueRemaining = new Semaphore(capacity, true); } @@ -286,6 +290,7 @@ public class FileChannel extends BasicChannelSemantics { builder.setBackupCheckpointDir(backupCheckpointDir); builder.setFsyncPerTransaction(fsyncPerTransaction); builder.setFsyncInterval(fsyncInterval); + builder.setCheckpointOnClose(checkpointOnClose); log = builder.build(); log.replay(); open = true; http://git-wip-us.apache.org/repos/asf/flume/blob/2006a574/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java index f8c0378..5c3c48f 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java @@ -98,5 +98,6 @@ public class FileChannelConfiguration { public static final String FSYNC_INTERVAL = "fsyncInterval"; public static final int DEFAULT_FSYNC_INTERVAL = 5; // seconds. - + public static final String CHKPT_ONCLOSE = "checkpointOnClose"; + public static final Boolean DEFAULT_CHKPT_ONCLOSE = true; } http://git-wip-us.apache.org/repos/asf/flume/blob/2006a574/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 0e9171e..247c287 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -128,6 +128,7 @@ public class Log { private final boolean fsyncPerTransaction; private final int fsyncInterval; + private final boolean checkpointOnClose; private int readCount; private int putCount; @@ -158,6 +159,8 @@ public class Log { private boolean fsyncPerTransaction = true; private int fsyncInterval; + private boolean checkpointOnClose = true; + boolean isFsyncPerTransaction() { return fsyncPerTransaction; } @@ -254,13 +257,18 @@ public class Log { return this; } + Builder setCheckpointOnClose(boolean enableCheckpointOnClose) { + this.checkpointOnClose = enableCheckpointOnClose; + return this; + } + Log build() throws IOException { return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity, bUseDualCheckpoints, bCompressBackupCheckpoint,bCheckpointDir, bBackupCheckpointDir, bName, useLogReplayV1, useFastReplay, bMinimumRequiredSpace, bEncryptionKeyProvider, bEncryptionKeyAlias, bEncryptionCipherProvider, bUsableSpaceRefreshInterval, - fsyncPerTransaction, fsyncInterval, bLogDirs); + fsyncPerTransaction, fsyncInterval, checkpointOnClose, bLogDirs); } } @@ -272,7 +280,7 @@ public class Log { @Nullable String encryptionKeyAlias, @Nullable String encryptionCipherProvider, long usableSpaceRefreshInterval, boolean fsyncPerTransaction, - int fsyncInterval, File... logDirs) + int fsyncInterval, boolean checkpointOnClose, File... logDirs) throws IOException { Preconditions.checkArgument(checkpointInterval > 0, "checkpointInterval <= 0"); @@ -352,6 +360,8 @@ public class Log { this.logDirs = logDirs; this.fsyncPerTransaction = fsyncPerTransaction; this.fsyncInterval = fsyncInterval; + this.checkpointOnClose = checkpointOnClose; + logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length); workerExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name) @@ -791,6 +801,14 @@ public class Log { lockExclusive(); try { open = false; + try { + if(checkpointOnClose) { + writeCheckpoint(true); // do this before acquiring exclusive lock + } + } catch (Exception err) { + LOGGER.warn("Failed creating checkpoint on close of channel " + channelNameDescriptor + + "Replay will take longer next time channel is started.", err); + } shutdownWorker(); if (logFiles != null) { for (int index = 0; index < logFiles.length(); index++) { http://git-wip-us.apache.org/repos/asf/flume/blob/2006a574/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java index f7f1afa..801d925 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java @@ -22,6 +22,8 @@ import static org.mockito.Mockito.*; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.channels.*; +import java.util.Collection; import java.util.List; import org.apache.commons.io.FileUtils; @@ -35,6 +37,8 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.io.Files; +import javax.ws.rs.Path; + public class TestLog { private static final Logger LOGGER = LoggerFactory.getLogger(TestLog.class); private static final long MAX_FILE_SIZE = 1000; @@ -56,7 +60,7 @@ public class TestLog { } log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize( MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir( - checkpointDir).setLogDirs(dataDirs) + checkpointDir).setLogDirs(dataDirs).setCheckpointOnClose(false) .setChannelName("testlog").build(); log.replay(); } @@ -465,6 +469,34 @@ public class TestLog { Long.MAX_VALUE - 1L); } + @Test + public void testCheckpointOnClose() throws Exception { + log.close(); + log = new Log.Builder().setCheckpointInterval(1L).setMaxFileSize( + MAX_FILE_SIZE).setQueueSize(CAPACITY).setCheckpointDir( + checkpointDir).setLogDirs(dataDirs).setCheckpointOnClose(true) + .setChannelName("testLog").build(); + log.replay(); + + + // 1 Write One Event + FlumeEvent eventIn = TestUtils.newPersistableEvent(); + log.put(transactionID, eventIn); + log.commitPut(transactionID); + + // 2 Check state of checkpoint before close + File checkPointMetaFile = + FileUtils.listFiles(checkpointDir,new String[]{"meta"},false).iterator().next(); + long before = FileUtils.checksumCRC32( checkPointMetaFile ); + + // 3 Close Log + log.close(); + + // 4 Verify that checkpoint was modified on close + long after = FileUtils.checksumCRC32( checkPointMetaFile ); + Assert.assertFalse( before == after ); + } + private void takeAndVerify(FlumeEventPointer eventPointerIn, FlumeEvent eventIn) throws IOException, InterruptedException, NoopRecordException, CorruptEventException { @@ -479,4 +511,5 @@ public class TestLog { Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders()); Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody()); } + } http://git-wip-us.apache.org/repos/asf/flume/blob/2006a574/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 78b139e..43ca5db 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2638,6 +2638,7 @@ capacity 1000000 keep-alive 3 Amount of time (in sec) to wait for a put operation use-log-replay-v1 false Expert: Use old replay logic use-fast-replay false Expert: Replay without using queue +checkpointOnClose true Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay. encryption.activeKey -- Key name used to encrypt new data encryption.cipherProvider -- Cipher provider type, supported types: AESCTRNOPADDING encryption.keyProvider -- Key provider type, supported types: JCEKSFILE http://git-wip-us.apache.org/repos/asf/flume/blob/2006a574/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java ---------------------------------------------------------------------- diff --git a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java index ac4dac4..a11126d 100644 --- a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java +++ b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java @@ -98,9 +98,20 @@ public class TestFileChannelIntegrityTool { } @Test + public void testFixCorruptRecordsWithCheckpoint() throws Exception { + doTestFixCorruptEvents(true); + } + + @Test + public void testFixCorruptRecords() throws Exception { + doTestFixCorruptEvents(false); + } + + @Test public void testFixInvalidRecords() throws Exception { doTestFixInvalidEvents(false, DummyEventVerifier.Builder.class.getName()); } + @Test public void testFixInvalidRecordsWithCheckpoint() throws Exception { doTestFixInvalidEvents(true, DummyEventVerifier.Builder.class.getName()); @@ -111,15 +122,24 @@ public class TestFileChannelIntegrityTool { tool.run(new String[] {"-l", dataDir.toString(), "-e", eventHandler, "-DvalidatorValue=0"}); FileChannel channel = new FileChannel(); channel.setName("channel"); - String cp; - if(withCheckpoint) { - cp = origCheckpointDir.toString(); + if (withCheckpoint) { + File[] cpFiles = origCheckpointDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + if (name.contains("lock") || name.contains("queueset")) { + return false; + } + return true; + } + }); + for (File cpFile : cpFiles) { + Serialization.copyFile(cpFile, new File(checkpointDir, cpFile.getName())); + } } else { FileUtils.deleteDirectory(checkpointDir); Assert.assertTrue(checkpointDir.mkdirs()); - cp = checkpointDir.toString(); } - ctx.put(FileChannelConfiguration.CHECKPOINT_DIR,cp); + ctx.put(FileChannelConfiguration.CHECKPOINT_DIR, checkpointDir.toString()); ctx.put(FileChannelConfiguration.DATA_DIRS, dataDir.toString()); channel.configure(ctx); channel.start(); @@ -136,15 +156,6 @@ public class TestFileChannelIntegrityTool { Assert.assertEquals(25 - invalidEvent, i); } - @Test - public void testFixCorruptRecords() throws Exception { - doTestFixCorruptEvents(false); - } - @Test - public void testFixCorruptRecordsWithCheckpoint() throws Exception { - doTestFixCorruptEvents(true); - } - public void doTestFixCorruptEvents(boolean withCheckpoint) throws Exception { Set<String> corruptFiles = new HashSet<String>(); File[] files = dataDir.listFiles(new FilenameFilter() { @@ -193,18 +204,27 @@ public class TestFileChannelIntegrityTool { } FileChannelIntegrityTool tool = new FileChannelIntegrityTool(); - tool.run(new String[] {"-l", dataDir.toString()}); + tool.run(new String[]{"-l", dataDir.toString()}); FileChannel channel = new FileChannel(); channel.setName("channel"); - String cp; - if(withCheckpoint) { - cp = origCheckpointDir.toString(); + if (withCheckpoint) { + File[] cpFiles = origCheckpointDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + if (name.contains("lock") || name.contains("queueset")) { + return false; + } + return true; + } + }); + for (File cpFile : cpFiles) { + Serialization.copyFile(cpFile, new File(checkpointDir, cpFile.getName())); + } } else { FileUtils.deleteDirectory(checkpointDir); Assert.assertTrue(checkpointDir.mkdirs()); - cp = checkpointDir.toString(); } - ctx.put(FileChannelConfiguration.CHECKPOINT_DIR,cp); + ctx.put(FileChannelConfiguration.CHECKPOINT_DIR, checkpointDir.toString()); ctx.put(FileChannelConfiguration.DATA_DIRS, dataDir.toString()); channel.configure(ctx); channel.start();
