Updated Branches: refs/heads/flume-1.3.0 e697bd168 -> 150c84541
FLUME-1583. FileChannel fast full replay will always be used if enabled (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/150c8454 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/150c8454 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/150c8454 Branch: refs/heads/flume-1.3.0 Commit: 150c84541b18331c79cd1372e0dcfa8d54e2d63c Parents: e697bd1 Author: Hari Shreedharan <[email protected]> Authored: Fri Sep 14 13:14:40 2012 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Sep 14 13:15:31 2012 -0700 ---------------------------------------------------------------------- .../flume/channel/file/CheckpointRebuilder.java | 36 +++---- .../java/org/apache/flume/channel/file/Log.java | 32 +++++-- .../apache/flume/channel/file/ReplayHandler.java | 24 +---- .../channel/file/TestCheckpointRebuilder.java | 82 +++++++++++++++ .../flume/channel/file/TestFileChannelRestart.java | 10 ++ .../org/apache/flume/channel/file/TestUtils.java | 10 ++ 6 files changed, 146 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/150c8454/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java index 6e1d2fc..748f49a 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java @@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory; public class CheckpointRebuilder { - private final File checkpointDir; private final List<File> logFiles; private final FlumeEventQueue queue; private final Set<ComparableFlumeEventPointer> committedPuts = @@ -53,19 +52,13 @@ public class CheckpointRebuilder { private static Logger LOG = LoggerFactory.getLogger(CheckpointRebuilder.class); - public CheckpointRebuilder(File checkpointDir, List<File> logFiles, + public CheckpointRebuilder(List<File> logFiles, FlumeEventQueue queue) throws IOException { - this.checkpointDir = checkpointDir; this.logFiles = logFiles; this.queue = queue; } public boolean rebuild() throws IOException, Exception { - File checkpointFile = new File(checkpointDir, "checkpoint"); - if (checkpointFile.exists()) { - LOG.info("Checkpoint file found, will not replay with fast logic."); - return false; - } LOG.info("Attempting to fast replay the log files."); List<LogFile.SequentialReader> logReaders = Lists.newArrayList(); for (File logFile : logFiles) { @@ -243,18 +236,23 @@ public class CheckpointRebuilder { logFiles.addAll(Arrays.asList(files)); } int capacity = Integer.parseInt(cli.getOptionValue("t")); - EventQueueBackingStore backingStore = - EventQueueBackingStoreFactory.get(new File(checkpointDir, "checkpoint"), - capacity, "channel"); - FlumeEventQueue queue = new FlumeEventQueue(backingStore, - new File(checkpointDir, "inflighttakes"), - new File(checkpointDir, "inflightputs")); - CheckpointRebuilder rebuilder = new CheckpointRebuilder(checkpointDir, - logFiles, queue); - if(rebuilder.rebuild()) { - rebuilder.writeCheckpoint(); + File checkpointFile = new File(checkpointDir, "checkpoint"); + if(checkpointFile.exists()) { + LOG.error("Cannot execute fast replay", + new IllegalStateException("Checkpoint exists" + checkpointFile)); } else { - LOG.error("Could not rebuild the checkpoint due to errors."); + EventQueueBackingStore backingStore = + EventQueueBackingStoreFactory.get(checkpointFile, + capacity, "channel"); + FlumeEventQueue queue = new FlumeEventQueue(backingStore, + new File(checkpointDir, "inflighttakes"), + new File(checkpointDir, "inflightputs")); + CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles, queue); + if(rebuilder.rebuild()) { + rebuilder.writeCheckpoint(); + } else { + LOG.error("Could not rebuild the checkpoint due to errors."); + } } } } http://git-wip-us.apache.org/repos/asf/flume/blob/150c8454/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 1d91460..e36eafb 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -309,11 +309,22 @@ class Log { */ LogUtils.sort(dataFiles); + boolean useFastReplay = this.useFastReplay; /* * Read the checkpoint (in memory queue) from one of two alternating * locations. We will read the last one written to disk. */ File checkpointFile = new File(checkpointDir, "checkpoint"); + if(useFastReplay) { + if(checkpointFile.exists()) { + LOGGER.debug("Disabling fast full replay because checkpoint " + + "exists: " + checkpointFile); + useFastReplay = false; + } else { + LOGGER.debug("Not disabling fast full replay because checkpoint " + + " does not exist: " + checkpointFile); + } + } File inflightTakesFile = new File(checkpointDir, "inflighttakes"); File inflightPutsFile = new File(checkpointDir, "inflightputs"); EventQueueBackingStore backingStore = @@ -329,16 +340,23 @@ class Log { * the queue, the timestamp the queue was written to disk, and * the list of data files. */ - ReplayHandler replayHandler = new ReplayHandler(queue, - encryptionKeyProvider, useFastReplay, checkpointFile); - if(useLogReplayV1) { - LOGGER.info("Replaying logs with v1 replay logic"); - replayHandler.replayLogv1(dataFiles); + CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles, + queue); + if(useFastReplay && rebuilder.rebuild()) { + LOGGER.info("Fast replay successful."); } else { - LOGGER.info("Replaying logs with v2 replay logic"); - replayHandler.replayLog(dataFiles); + ReplayHandler replayHandler = new ReplayHandler(queue, + encryptionKeyProvider); + if(useLogReplayV1) { + LOGGER.info("Replaying logs with v1 replay logic"); + replayHandler.replayLogv1(dataFiles); + } else { + LOGGER.info("Replaying logs with v2 replay logic"); + replayHandler.replayLog(dataFiles); + } } + for (int index = 0; index < logDirs.length; index++) { LOGGER.info("Rolling " + logDirs[index]); roll(index); http://git-wip-us.apache.org/repos/asf/flume/blob/150c8454/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java index 7c32526..81f6172 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java @@ -69,15 +69,11 @@ class ReplayHandler { * finding the put and commit in logdir2. */ private final List<Long> pendingTakes; - private final boolean useFastReplay; - private final File cpDir; - ReplayHandler(FlumeEventQueue queue, @Nullable KeyProvider encryptionKeyProvider, - boolean useFastReplay, File cpDir) { + ReplayHandler(FlumeEventQueue queue, + @Nullable KeyProvider encryptionKeyProvider) { this.queue = queue; - this.useFastReplay = useFastReplay; this.lastCheckpoint = queue.getLogWriteOrderID(); - this.cpDir = cpDir; pendingTakes = Lists.newArrayList(); readers = Maps.newHashMap(); logRecordBuffer = new PriorityQueue<LogRecord>(); @@ -89,14 +85,6 @@ class ReplayHandler { */ @Deprecated void replayLogv1(List<File> logs) throws Exception { - if(useFastReplay) { - CheckpointRebuilder rebuilder = new CheckpointRebuilder(cpDir, logs, - queue); - if(rebuilder.rebuild()){ - LOG.info("Fast replay successful."); - return; - } - } int total = 0; int count = 0; MultiMap transactionMap = new MultiValueMap(); @@ -228,14 +216,6 @@ class ReplayHandler { * @throws IOException */ void replayLog(List<File> logs) throws Exception { - if (useFastReplay) { - CheckpointRebuilder rebuilder = new CheckpointRebuilder(cpDir, logs, - queue); - if (rebuilder.rebuild()) { - LOG.info("Fast replay successful."); - return; - } - } int count = 0; MultiMap transactionMap = new MultiValueMap(); // seed both with the highest known sequence of either the tnxid or woid http://git-wip-us.apache.org/repos/asf/flume/blob/150c8454/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java new file mode 100644 index 0000000..ffc4623 --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +import static org.apache.flume.channel.file.TestUtils.*; + +import java.io.File; +import java.util.Map; +import java.util.Set; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +public class TestCheckpointRebuilder extends TestFileChannelBase { + + protected static final Logger LOG = LoggerFactory + .getLogger(TestCheckpointRebuilder.class); + + @Before + public void setup() throws Exception { + super.setup(); + } + + @After + public void teardown() { + super.teardown(); + } + @Test + public void testFastReplay() throws Exception { + Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.CAPACITY, + String.valueOf(50)); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set<String> in = fillChannel(channel, "checkpointBulder"); + channel.stop(); + File checkpointFile = new File(checkpointDir, "checkpoint"); + File metaDataFile = Serialization.getMetaDataFile(checkpointFile); + File inflightTakesFile = new File(checkpointDir, "inflighttakes"); + File inflightPutsFile = new File(checkpointDir, "inflightputs"); + Assert.assertTrue(checkpointFile.delete()); + Assert.assertTrue(metaDataFile.delete()); + Assert.assertTrue(inflightTakesFile.delete()); + Assert.assertTrue(inflightPutsFile.delete()); + EventQueueBackingStore backingStore = + EventQueueBackingStoreFactory.get(checkpointFile, 50, + "test"); + FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakesFile, + inflightPutsFile); + CheckpointRebuilder checkpointRebuilder = + new CheckpointRebuilder(getAllLogs(dataDirs), queue); + Assert.assertTrue(checkpointRebuilder.rebuild()); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set<String> out = consumeChannel(channel); + compareInputAndOut(in, out); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/150c8454/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java index 4133573..68285cc 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java @@ -66,6 +66,16 @@ public class TestFileChannelRestart extends TestFileChannelBase { } @Test + public void testFastReplayNegativeTestV1() throws Exception { + doTestRestart(true, true, false, true); + } + + @Test + public void testFastReplayNegativeTestV2() throws Exception { + doTestRestart(false, true, false, true); + } + + @Test public void testNormalReplayV1() throws Exception { doTestRestart(true, true, true, false); } http://git-wip-us.apache.org/repos/asf/flume/blob/150c8454/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java index 2b88b96..8807201 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.net.URL; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -45,6 +46,7 @@ import org.apache.hadoop.io.Writable; import org.junit.Assert; import com.google.common.base.Charsets; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; @@ -105,6 +107,14 @@ public class TestUtils { return events; } + public static List<File> getAllLogs(File[] dataDirs) { + List<File> result = Lists.newArrayList(); + for(File dataDir : dataDirs) { + result.addAll(LogUtils.getLogs(dataDir)); + } + return result; + } + public static void forceCheckpoint(FileChannel channel) { Log log = field("log") .ofType(Log.class)
