FLUME-1528: File Channel replay when no checkpoint is present can be faster
(Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c249b55c Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c249b55c Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c249b55c Branch: refs/heads/cdh-1.2.0+24_intuit Commit: c249b55c0aa87bfd659a026a5f08b7c6932b94fd Parents: 0f52670 Author: Brock Noland <[email protected]> Authored: Fri Aug 31 11:19:57 2012 -0500 Committer: Mike Percy <[email protected]> Committed: Fri Sep 7 14:03:06 2012 -0700 ---------------------------------------------------------------------- .../flume/channel/file/CheckpointRebuilder.java | 251 +++++++++++++++ .../org/apache/flume/channel/file/FileChannel.java | 6 + .../channel/file/FileChannelConfiguration.java | 3 + .../java/org/apache/flume/channel/file/Log.java | 17 +- .../apache/flume/channel/file/ReplayHandler.java | 29 ++- .../apache/flume/channel/file/TestFileChannel.java | 27 ++- 6 files changed, 323 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/c249b55c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java new file mode 100644 index 0000000..4db1b9c --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CheckpointRebuilder { + + private final File checkpointDir; + private final List<File> logFiles; + private final long maxFileSize; + private final FlumeEventQueue queue; + private final Set<ComparableFlumeEventPointer> committedPuts = + Sets.newHashSet(); + private final Set<ComparableFlumeEventPointer> pendingTakes = + Sets.newHashSet(); + private final SetMultimap<Long, ComparableFlumeEventPointer> uncommittedPuts = + HashMultimap.create(); + private final SetMultimap<Long, ComparableFlumeEventPointer> + uncommittedTakes = HashMultimap.create(); + + private static Logger LOG = + LoggerFactory.getLogger(CheckpointRebuilder.class); + + public CheckpointRebuilder(File checkpointDir, List<File> logFiles, + long maxFileSize, + FlumeEventQueue queue) throws IOException { + this.checkpointDir = checkpointDir; + this.logFiles = logFiles; + this.queue = queue; + this.maxFileSize = maxFileSize; + } + + public boolean rebuild() throws IOException, Exception { + File checkpointFile = new File(checkpointDir, "checkpoint"); + if (checkpointFile.exists()) { + LOG.info("Checkpoint file found, will not replay with fast logic."); + return false; + } + LOG.info("Attempting to fast replay the log files."); + List<LogFile.SequentialReader> logReaders = Lists.newArrayList(); + for (File logFile : logFiles) { + logReaders.add(new LogFile.SequentialReader(logFile)); + } + long transactionIDSeed = 0; + long writeOrderIDSeed = 0; + try { + for (LogFile.SequentialReader log : logReaders) { + LogRecord entry; + int fileID = log.getLogFileID(); + while ((entry = log.next()) != null) { + int offset = entry.getOffset(); + TransactionEventRecord record = entry.getEvent(); + long trans = record.getTransactionID(); + long writeOrderID = record.getLogWriteOrderID(); + transactionIDSeed = Math.max(trans, transactionIDSeed); + writeOrderIDSeed = Math.max(writeOrderID, writeOrderIDSeed); + if (record.getRecordType() == TransactionEventRecord.Type.PUT.get()) { + uncommittedPuts.put(record.getTransactionID(), + new ComparableFlumeEventPointer( + new FlumeEventPointer(fileID, offset), + record.getLogWriteOrderID())); + } else if (record.getRecordType() + == TransactionEventRecord.Type.TAKE.get()) { + Take take = (Take) record; + uncommittedTakes.put(record.getTransactionID(), + new ComparableFlumeEventPointer( + new FlumeEventPointer(take.getFileID(), take.getOffset()), + record.getLogWriteOrderID())); + } else if (record.getRecordType() + == TransactionEventRecord.Type.COMMIT.get()) { + Commit commit = (Commit) record; + if (commit.getType() + == TransactionEventRecord.Type.PUT.get()) { + Set<ComparableFlumeEventPointer> puts = + uncommittedPuts.get(record.getTransactionID()); + if (puts != null) { + for (ComparableFlumeEventPointer put : puts) { + if (!pendingTakes.remove(put)) { + committedPuts.add(put); + } + } + } + } else { + Set<ComparableFlumeEventPointer> takes = + uncommittedTakes.get(record.getTransactionID()); + if (takes != null) { + for (ComparableFlumeEventPointer take : takes) { + if (!committedPuts.remove(take)) { + pendingTakes.add(take); + } + } + } + } + } else if (record.getRecordType() + == TransactionEventRecord.Type.ROLLBACK.get()) { + if (uncommittedPuts.containsKey(record.getTransactionID())) { + uncommittedPuts.removeAll(record.getTransactionID()); + } else { + uncommittedTakes.removeAll(record.getTransactionID()); + } + } + } + } + } catch (Exception e) { + LOG.warn("Error while generating checkpoint " + + "using fast generation logic", e); + return false; + } finally { + TransactionIDOracle.setSeed(transactionIDSeed); + WriteOrderOracle.setSeed(writeOrderIDSeed); + for (LogFile.SequentialReader reader : logReaders) { + reader.close(); + } + } + Set<ComparableFlumeEventPointer> sortedPuts = + Sets.newTreeSet(committedPuts); + for (ComparableFlumeEventPointer put : sortedPuts) { + queue.addTail(put.pointer); + } + return true; + } + + private void writeCheckpoint() throws IOException { + long checkpointLogOrderID = 0; + List<LogFile.Writer> logWriters = Lists.newArrayList(); + for (File logFile : logFiles) { + String name = logFile.getName(); + logWriters.add(new LogFile.Writer(logFile, + Integer.parseInt(name.substring(name.lastIndexOf('-') + 1)), + maxFileSize)); + } + try { + if (queue.checkpoint(true)) { + checkpointLogOrderID = queue.getLogWriteOrderID(); + for (LogFile.Writer logWriter : logWriters) { + logWriter.markCheckpoint(checkpointLogOrderID); + } + } + } catch (Exception e) { + LOG.warn("Error while generating checkpoint " + + "using fast generation logic", e); + } finally { + for (LogFile.Writer logWriter : logWriters) { + logWriter.close(); + } + } + } + + private class ComparableFlumeEventPointer + implements Comparable<ComparableFlumeEventPointer> { + + private final FlumeEventPointer pointer; + private final long orderID; + + public ComparableFlumeEventPointer(FlumeEventPointer pointer, long orderID){ + this.pointer = pointer; + this.orderID = orderID; + } + + @Override + public int compareTo(ComparableFlumeEventPointer o) { + if (orderID < o.orderID) { + return -1; + } else { //Unfortunately same log order id does not mean same event + //for older logs. + return 1; + } + } + + @Override + public int hashCode(){ + return pointer.hashCode(); + } + + @Override + public boolean equals(Object o){ + return pointer.equals(o); + } + } + + public static void main(String[] args) throws Exception { + Options options = new Options(); + Option opt = new Option("c", true, "checkpoint directory"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option("l", true, "comma-separated list of log directories"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option("s", true, "maximum size of log files"); + opt.setRequired(true); + options.addOption(opt); + opt = new Option("t", true, "capacity of the channel"); + opt.setRequired(true); + options.addOption(opt); + CommandLineParser parser = new GnuParser(); + CommandLine cli = parser.parse(options, args); + File checkpointDir = new File(cli.getOptionValue("c")); + String[] logDirs = cli.getOptionValue("l").split(","); + List<File> logFiles = Lists.newArrayList(); + for (String logDir : logDirs) { + File[] files = new File(logDir).listFiles(); + logFiles.addAll(Arrays.asList(files)); + } + int capacity = Integer.parseInt(cli.getOptionValue("t")); + long maxFileSize = Long.parseLong(cli.getOptionValue("s")); + boolean isReplayV1 = cli.hasOption("v"); + FlumeEventQueue queue = new FlumeEventQueue(capacity, + new File(checkpointDir, "checkpoint"), + new File(checkpointDir, "inflighttakes"), + new File(checkpointDir, "inflightputs"), "channel"); + CheckpointRebuilder rebuilder = new CheckpointRebuilder(checkpointDir, + logFiles, maxFileSize, queue); + if(rebuilder.rebuild()) { + rebuilder.writeCheckpoint(); + } else { + LOG.error("Could not rebuild the checkpoint due to errors."); + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/c249b55c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index 995bad5..5d588ea 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -86,6 +86,7 @@ public class FileChannel extends BasicChannelSemantics { private String channelNameDescriptor = "[channel=unknown]"; private ChannelCounter channelCounter; private boolean useLogReplayV1; + private boolean useFastReplay = false; @Override public synchronized void setName(String name) { @@ -193,6 +194,10 @@ public class FileChannel extends BasicChannelSemantics { FileChannelConfiguration.USE_LOG_REPLAY_V1, FileChannelConfiguration.DEFAULT_USE_LOG_REPLAY_V1); + useFastReplay = context.getBoolean( + FileChannelConfiguration.USE_FAST_REPLAY, + FileChannelConfiguration.DEFAULT_USE_FAST_REPLAY); + if(queueRemaining == null) { queueRemaining = new Semaphore(capacity, true); } @@ -220,6 +225,7 @@ public class FileChannel extends BasicChannelSemantics { builder.setChannelName(getName()); builder.setCheckpointWriteTimeout(checkpointWriteTimeout); builder.setUseLogReplayV1(useLogReplayV1); + builder.setUseFastReplay(useFastReplay); log = builder.build(); log.replay(); open = true; http://git-wip-us.apache.org/repos/asf/flume/blob/c249b55c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java index 9fc8df1..be2f633 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java @@ -77,4 +77,7 @@ public class FileChannelConfiguration { */ public static final String USE_LOG_REPLAY_V1 = "use-log-replay-v1"; public static final boolean DEFAULT_USE_LOG_REPLAY_V1 = false; + + public static final String USE_FAST_REPLAY = "use-fast-replay"; + public static final boolean DEFAULT_USE_FAST_REPLAY = false; } http://git-wip-us.apache.org/repos/asf/flume/blob/c249b55c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index b8f6570..e13ecc4 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -78,6 +78,7 @@ class Log { private FlumeEventQueue queue; private long checkpointInterval; private long maxFileSize; + private final boolean useFastReplay; private final Map<String, FileLock> locks; private final ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(true); @@ -107,6 +108,7 @@ class Log { private int bCheckpointWriteTimeout = FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT; private boolean useLogReplayV1; + private boolean useFastReplay; Builder setCheckpointInterval(long interval) { bCheckpointInterval = interval; @@ -152,16 +154,22 @@ class Log { return this; } + Builder setUseFastReplay(boolean useFastReplay){ + this.useFastReplay = useFastReplay; + return this; + } + Log build() throws IOException { return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity, bLogWriteTimeout, bCheckpointWriteTimeout, bCheckpointDir, bName, - useLogReplayV1, bLogDirs); + useLogReplayV1, useFastReplay, bLogDirs); } } private Log(long checkpointInterval, long maxFileSize, int queueCapacity, int logWriteTimeout, int checkpointWriteTimeout, File checkpointDir, - String name, boolean useLogReplayV1, File... logDirs) + String name, boolean useLogReplayV1, boolean useFastReplay, + File... logDirs) throws IOException { Preconditions.checkArgument(checkpointInterval > 0, "checkpointInterval <= 0"); @@ -179,7 +187,7 @@ class Log { this.channelName = name; this.channelNameDescriptor = "[channel=" + name + "]"; this.useLogReplayV1 = useLogReplayV1; - + this.useFastReplay = useFastReplay; for (File logDir : logDirs) { Preconditions.checkArgument(logDir.isDirectory() || logDir.mkdirs(), "LogDir " + logDir + " could not be created"); @@ -270,7 +278,8 @@ class Log { * the queue, the timestamp the queue was written to disk, and * the list of data files. */ - ReplayHandler replayHandler = new ReplayHandler(queue); + ReplayHandler replayHandler = new ReplayHandler(queue, useFastReplay, + checkpointFile, maxFileSize); if(useLogReplayV1) { LOGGER.info("Replaying logs with v1 replay logic"); replayHandler.replayLogv1(dataFiles); http://git-wip-us.apache.org/repos/asf/flume/blob/c249b55c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java index 6f8af09..4e908ec 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java @@ -65,10 +65,17 @@ class ReplayHandler { * finding the put and commit in logdir2. */ private final List<Long> pendingTakes; + private final boolean useFastReplay; + private final File cpDir; + private final long maxFileSize; - ReplayHandler(FlumeEventQueue queue) { + ReplayHandler(FlumeEventQueue queue, boolean useFastReplay, File cpDir, + long maxFileSize) { this.queue = queue; + this.useFastReplay = useFastReplay; this.lastCheckpoint = queue.getLogWriteOrderID(); + this.cpDir = cpDir; + this.maxFileSize = maxFileSize; pendingTakes = Lists.newArrayList(); readers = Maps.newHashMap(); logRecordBuffer = new PriorityQueue<LogRecord>(); @@ -78,7 +85,15 @@ class ReplayHandler { * is failing on ol logs for some reason. */ @Deprecated - void replayLogv1(List<File> logs) throws IOException { + void replayLogv1(List<File> logs) throws Exception { + if(useFastReplay) { + CheckpointRebuilder rebuilder = new CheckpointRebuilder(cpDir, logs, + maxFileSize, queue); + if(rebuilder.rebuild()){ + LOG.info("Fast replay successful."); + return; + } + } int total = 0; int count = 0; MultiMap transactionMap = new MultiValueMap(); @@ -209,7 +224,15 @@ class ReplayHandler { * @param logs * @throws IOException */ - void replayLog(List<File> logs) throws IOException { + void replayLog(List<File> logs) throws Exception { + if (useFastReplay) { + CheckpointRebuilder rebuilder = new CheckpointRebuilder(cpDir, logs, + maxFileSize, queue); + if (rebuilder.rebuild()) { + LOG.info("Fast replay successful."); + return; + } + } int count = 0; MultiMap transactionMap = new MultiValueMap(); // seed both with the highest known sequence of either the tnxid or woid http://git-wip-us.apache.org/repos/asf/flume/blob/c249b55c/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java index 0fd3176..35521d1 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java @@ -221,16 +221,30 @@ public class TestFileChannel { @Test public void testRestartLogReplayV1() throws Exception { - doTestRestart(true); + doTestRestart(true, false, false); } @Test public void testRestartLogReplayV2() throws Exception { - doTestRestart(false); + doTestRestart(false, false, false); } - public void doTestRestart(boolean useLogReplayV1) throws Exception { + + @Test + public void testFastReplayV1() throws Exception { + doTestRestart(true, true, true); + } + + @Test + public void testFastReplayV2() throws Exception { + doTestRestart(false, true, true); + } + public void doTestRestart(boolean useLogReplayV1, + boolean forceCheckpoint, boolean deleteCheckpoint) throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1, String.valueOf(useLogReplayV1)); + overrides.put( + FileChannelConfiguration.USE_FAST_REPLAY, + String.valueOf(deleteCheckpoint)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); @@ -243,7 +257,14 @@ public class TestFileChannel { Assert.assertEquals("Cannot acquire capacity. [channel=" +channel.getName()+"]", e.getMessage()); } + if (forceCheckpoint) { + forceCheckpoint(channel); + } channel.stop(); + if(deleteCheckpoint) { + File checkpoint = new File(checkpointDir, "checkpoint"); + checkpoint.delete(); + } channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen());
