FLUME-1528: File Channel replay when no checkpoint is present can be faster

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c249b55c
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c249b55c
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c249b55c

Branch: refs/heads/cdh-1.2.0+24_intuit
Commit: c249b55c0aa87bfd659a026a5f08b7c6932b94fd
Parents: 0f52670
Author: Brock Noland <[email protected]>
Authored: Fri Aug 31 11:19:57 2012 -0500
Committer: Mike Percy <[email protected]>
Committed: Fri Sep 7 14:03:06 2012 -0700

----------------------------------------------------------------------
 .../flume/channel/file/CheckpointRebuilder.java    |  251 +++++++++++++++
 .../org/apache/flume/channel/file/FileChannel.java |    6 +
 .../channel/file/FileChannelConfiguration.java     |    3 +
 .../java/org/apache/flume/channel/file/Log.java    |   17 +-
 .../apache/flume/channel/file/ReplayHandler.java   |   29 ++-
 .../apache/flume/channel/file/TestFileChannel.java |   27 ++-
 6 files changed, 323 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/c249b55c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
new file mode 100644
index 0000000..4db1b9c
--- /dev/null
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CheckpointRebuilder {
+
+  private final File checkpointDir;
+  private final List<File> logFiles;
+  private final long maxFileSize;
+  private final FlumeEventQueue queue;
+  private final Set<ComparableFlumeEventPointer> committedPuts =
+          Sets.newHashSet();
+  private final Set<ComparableFlumeEventPointer> pendingTakes =
+          Sets.newHashSet();
+  private final SetMultimap<Long, ComparableFlumeEventPointer> uncommittedPuts 
=
+          HashMultimap.create();
+  private final SetMultimap<Long, ComparableFlumeEventPointer>
+          uncommittedTakes = HashMultimap.create();
+
+  private static Logger LOG =
+          LoggerFactory.getLogger(CheckpointRebuilder.class);
+
+  public CheckpointRebuilder(File checkpointDir, List<File> logFiles,
+          long maxFileSize,
+          FlumeEventQueue queue) throws IOException {
+    this.checkpointDir = checkpointDir;
+    this.logFiles = logFiles;
+    this.queue = queue;
+    this.maxFileSize = maxFileSize;
+  }
+
+  public boolean rebuild() throws IOException, Exception {
+    File checkpointFile = new File(checkpointDir, "checkpoint");
+    if (checkpointFile.exists()) {
+      LOG.info("Checkpoint file found, will not replay with fast logic.");
+      return false;
+    }
+    LOG.info("Attempting to fast replay the log files.");
+    List<LogFile.SequentialReader> logReaders = Lists.newArrayList();
+    for (File logFile : logFiles) {
+      logReaders.add(new LogFile.SequentialReader(logFile));
+    }
+    long transactionIDSeed = 0;
+    long writeOrderIDSeed = 0;
+    try {
+      for (LogFile.SequentialReader log : logReaders) {
+        LogRecord entry;
+        int fileID = log.getLogFileID();
+        while ((entry = log.next()) != null) {
+          int offset = entry.getOffset();
+          TransactionEventRecord record = entry.getEvent();
+          long trans = record.getTransactionID();
+          long writeOrderID = record.getLogWriteOrderID();
+            transactionIDSeed = Math.max(trans, transactionIDSeed);
+            writeOrderIDSeed = Math.max(writeOrderID, writeOrderIDSeed);
+          if (record.getRecordType() == TransactionEventRecord.Type.PUT.get()) 
{
+            uncommittedPuts.put(record.getTransactionID(),
+                    new ComparableFlumeEventPointer(
+                    new FlumeEventPointer(fileID, offset),
+                    record.getLogWriteOrderID()));
+          } else if (record.getRecordType()
+                  == TransactionEventRecord.Type.TAKE.get()) {
+            Take take = (Take) record;
+            uncommittedTakes.put(record.getTransactionID(),
+                    new ComparableFlumeEventPointer(
+                    new FlumeEventPointer(take.getFileID(), take.getOffset()),
+                    record.getLogWriteOrderID()));
+          } else if (record.getRecordType()
+                  == TransactionEventRecord.Type.COMMIT.get()) {
+            Commit commit = (Commit) record;
+            if (commit.getType()
+                    == TransactionEventRecord.Type.PUT.get()) {
+              Set<ComparableFlumeEventPointer> puts =
+                      uncommittedPuts.get(record.getTransactionID());
+              if (puts != null) {
+                for (ComparableFlumeEventPointer put : puts) {
+                  if (!pendingTakes.remove(put)) {
+                    committedPuts.add(put);
+                  }
+                }
+              }
+            } else {
+              Set<ComparableFlumeEventPointer> takes =
+                      uncommittedTakes.get(record.getTransactionID());
+              if (takes != null) {
+                for (ComparableFlumeEventPointer take : takes) {
+                  if (!committedPuts.remove(take)) {
+                    pendingTakes.add(take);
+                  }
+                }
+              }
+            }
+          } else if (record.getRecordType()
+                  == TransactionEventRecord.Type.ROLLBACK.get()) {
+            if (uncommittedPuts.containsKey(record.getTransactionID())) {
+              uncommittedPuts.removeAll(record.getTransactionID());
+            } else {
+              uncommittedTakes.removeAll(record.getTransactionID());
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn("Error while generating checkpoint "
+              + "using fast generation logic", e);
+      return false;
+    } finally {
+        TransactionIDOracle.setSeed(transactionIDSeed);
+        WriteOrderOracle.setSeed(writeOrderIDSeed);
+      for (LogFile.SequentialReader reader : logReaders) {
+        reader.close();
+      }
+    }
+    Set<ComparableFlumeEventPointer> sortedPuts =
+            Sets.newTreeSet(committedPuts);
+    for (ComparableFlumeEventPointer put : sortedPuts) {
+      queue.addTail(put.pointer);
+    }
+    return true;
+  }
+
+  private void writeCheckpoint() throws IOException {
+    long checkpointLogOrderID = 0;
+    List<LogFile.Writer> logWriters = Lists.newArrayList();
+    for (File logFile : logFiles) {
+        String name = logFile.getName();
+        logWriters.add(new LogFile.Writer(logFile,
+                Integer.parseInt(name.substring(name.lastIndexOf('-') + 1)),
+                maxFileSize));
+    }
+    try {
+      if (queue.checkpoint(true)) {
+        checkpointLogOrderID = queue.getLogWriteOrderID();
+        for (LogFile.Writer logWriter : logWriters) {
+          logWriter.markCheckpoint(checkpointLogOrderID);
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn("Error while generating checkpoint "
+              + "using fast generation logic", e);
+    } finally {
+      for (LogFile.Writer logWriter : logWriters) {
+        logWriter.close();
+      }
+    }
+  }
+
+  private class ComparableFlumeEventPointer
+          implements Comparable<ComparableFlumeEventPointer> {
+
+    private final FlumeEventPointer pointer;
+    private final long orderID;
+
+    public ComparableFlumeEventPointer(FlumeEventPointer pointer, long 
orderID){
+      this.pointer = pointer;
+      this.orderID = orderID;
+    }
+
+    @Override
+    public int compareTo(ComparableFlumeEventPointer o) {
+      if (orderID < o.orderID) {
+        return -1;
+      } else { //Unfortunately same log order id does not mean same event
+        //for older logs.
+        return 1;
+      }
+    }
+
+    @Override
+    public int hashCode(){
+      return pointer.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o){
+      return pointer.equals(o);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Options options = new Options();
+    Option opt = new Option("c", true, "checkpoint directory");
+    opt.setRequired(true);
+    options.addOption(opt);
+    opt = new Option("l", true, "comma-separated list of log directories");
+    opt.setRequired(true);
+    options.addOption(opt);
+    opt = new Option("s", true, "maximum size of log files");
+    opt.setRequired(true);
+    options.addOption(opt);
+    opt = new Option("t", true, "capacity of the channel");
+    opt.setRequired(true);
+    options.addOption(opt);
+    CommandLineParser parser = new GnuParser();
+    CommandLine cli = parser.parse(options, args);
+    File checkpointDir = new File(cli.getOptionValue("c"));
+    String[] logDirs = cli.getOptionValue("l").split(",");
+    List<File> logFiles = Lists.newArrayList();
+    for (String logDir : logDirs) {
+      File[] files = new File(logDir).listFiles();
+      logFiles.addAll(Arrays.asList(files));
+    }
+    int capacity = Integer.parseInt(cli.getOptionValue("t"));
+    long maxFileSize = Long.parseLong(cli.getOptionValue("s"));
+    boolean isReplayV1 = cli.hasOption("v");
+    FlumeEventQueue queue = new FlumeEventQueue(capacity,
+            new File(checkpointDir, "checkpoint"),
+            new File(checkpointDir, "inflighttakes"),
+            new File(checkpointDir, "inflightputs"), "channel");
+    CheckpointRebuilder rebuilder = new CheckpointRebuilder(checkpointDir,
+            logFiles, maxFileSize, queue);
+    if(rebuilder.rebuild()) {
+      rebuilder.writeCheckpoint();
+    } else {
+      LOG.error("Could not rebuild the checkpoint due to errors.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/c249b55c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index 995bad5..5d588ea 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -86,6 +86,7 @@ public class FileChannel extends BasicChannelSemantics {
   private String channelNameDescriptor = "[channel=unknown]";
   private ChannelCounter channelCounter;
   private boolean useLogReplayV1;
+  private boolean useFastReplay = false;
 
   @Override
   public synchronized void setName(String name) {
@@ -193,6 +194,10 @@ public class FileChannel extends BasicChannelSemantics {
         FileChannelConfiguration.USE_LOG_REPLAY_V1,
           FileChannelConfiguration.DEFAULT_USE_LOG_REPLAY_V1);
 
+    useFastReplay = context.getBoolean(
+            FileChannelConfiguration.USE_FAST_REPLAY,
+            FileChannelConfiguration.DEFAULT_USE_FAST_REPLAY);
+
     if(queueRemaining == null) {
       queueRemaining = new Semaphore(capacity, true);
     }
@@ -220,6 +225,7 @@ public class FileChannel extends BasicChannelSemantics {
       builder.setChannelName(getName());
       builder.setCheckpointWriteTimeout(checkpointWriteTimeout);
       builder.setUseLogReplayV1(useLogReplayV1);
+      builder.setUseFastReplay(useFastReplay);
       log = builder.build();
       log.replay();
       open = true;

http://git-wip-us.apache.org/repos/asf/flume/blob/c249b55c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
index 9fc8df1..be2f633 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
@@ -77,4 +77,7 @@ public class FileChannelConfiguration {
    */
   public static final String USE_LOG_REPLAY_V1 = "use-log-replay-v1";
   public static final boolean DEFAULT_USE_LOG_REPLAY_V1 = false;
+
+  public static final String USE_FAST_REPLAY = "use-fast-replay";
+  public static final boolean DEFAULT_USE_FAST_REPLAY = false;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/c249b55c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index b8f6570..e13ecc4 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -78,6 +78,7 @@ class Log {
   private FlumeEventQueue queue;
   private long checkpointInterval;
   private long maxFileSize;
+  private final boolean useFastReplay;
   private final Map<String, FileLock> locks;
   private final ReentrantReadWriteLock checkpointLock =
       new ReentrantReadWriteLock(true);
@@ -107,6 +108,7 @@ class Log {
     private int bCheckpointWriteTimeout =
         FileChannelConfiguration.DEFAULT_CHECKPOINT_WRITE_TIMEOUT;
     private boolean useLogReplayV1;
+    private boolean useFastReplay;
 
     Builder setCheckpointInterval(long interval) {
       bCheckpointInterval = interval;
@@ -152,16 +154,22 @@ class Log {
       return this;
     }
 
+    Builder setUseFastReplay(boolean useFastReplay){
+      this.useFastReplay = useFastReplay;
+      return this;
+    }
+
     Log build() throws IOException {
       return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
           bLogWriteTimeout, bCheckpointWriteTimeout, bCheckpointDir, bName,
-          useLogReplayV1, bLogDirs);
+          useLogReplayV1, useFastReplay, bLogDirs);
     }
   }
 
   private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
       int logWriteTimeout, int checkpointWriteTimeout, File checkpointDir,
-      String name, boolean useLogReplayV1, File... logDirs)
+      String name, boolean useLogReplayV1, boolean useFastReplay,
+      File... logDirs)
           throws IOException {
     Preconditions.checkArgument(checkpointInterval > 0,
         "checkpointInterval <= 0");
@@ -179,7 +187,7 @@ class Log {
     this.channelName = name;
     this.channelNameDescriptor = "[channel=" + name + "]";
     this.useLogReplayV1 = useLogReplayV1;
-
+    this.useFastReplay = useFastReplay;
     for (File logDir : logDirs) {
       Preconditions.checkArgument(logDir.isDirectory() || logDir.mkdirs(),
           "LogDir " + logDir + " could not be created");
@@ -270,7 +278,8 @@ class Log {
        * the queue, the timestamp the queue was written to disk, and
        * the list of data files.
        */
-      ReplayHandler replayHandler = new ReplayHandler(queue);
+      ReplayHandler replayHandler = new ReplayHandler(queue, useFastReplay,
+              checkpointFile, maxFileSize);
       if(useLogReplayV1) {
         LOGGER.info("Replaying logs with v1 replay logic");
         replayHandler.replayLogv1(dataFiles);

http://git-wip-us.apache.org/repos/asf/flume/blob/c249b55c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
index 6f8af09..4e908ec 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
@@ -65,10 +65,17 @@ class ReplayHandler {
    * finding the put and commit in logdir2.
    */
   private final List<Long> pendingTakes;
+  private final boolean useFastReplay;
+  private final File cpDir;
+  private final long maxFileSize;
 
-  ReplayHandler(FlumeEventQueue queue) {
+  ReplayHandler(FlumeEventQueue queue, boolean useFastReplay, File cpDir,
+          long maxFileSize) {
     this.queue = queue;
+    this.useFastReplay = useFastReplay;
     this.lastCheckpoint = queue.getLogWriteOrderID();
+    this.cpDir = cpDir;
+    this.maxFileSize = maxFileSize;
     pendingTakes = Lists.newArrayList();
     readers = Maps.newHashMap();
     logRecordBuffer = new PriorityQueue<LogRecord>();
@@ -78,7 +85,15 @@ class ReplayHandler {
    * is failing on ol logs for some reason.
    */
   @Deprecated
-  void replayLogv1(List<File> logs) throws IOException {
+  void replayLogv1(List<File> logs) throws Exception {
+    if(useFastReplay) {
+      CheckpointRebuilder rebuilder = new CheckpointRebuilder(cpDir, logs,
+              maxFileSize, queue);
+      if(rebuilder.rebuild()){
+        LOG.info("Fast replay successful.");
+        return;
+      }
+    }
     int total = 0;
     int count = 0;
     MultiMap transactionMap = new MultiValueMap();
@@ -209,7 +224,15 @@ class ReplayHandler {
    * @param logs
    * @throws IOException
    */
-  void replayLog(List<File> logs) throws IOException {
+  void replayLog(List<File> logs) throws Exception {
+    if (useFastReplay) {
+      CheckpointRebuilder rebuilder = new CheckpointRebuilder(cpDir, logs,
+              maxFileSize, queue);
+      if (rebuilder.rebuild()) {
+        LOG.info("Fast replay successful.");
+        return;
+      }
+    }
     int count = 0;
     MultiMap transactionMap = new MultiValueMap();
     // seed both with the highest known sequence of either the tnxid or woid

http://git-wip-us.apache.org/repos/asf/flume/blob/c249b55c/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index 0fd3176..35521d1 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -221,16 +221,30 @@ public class TestFileChannel {
 
   @Test
   public void testRestartLogReplayV1() throws Exception {
-    doTestRestart(true);
+    doTestRestart(true, false, false);
   }
   @Test
   public void testRestartLogReplayV2() throws Exception {
-    doTestRestart(false);
+    doTestRestart(false, false, false);
   }
-  public void doTestRestart(boolean useLogReplayV1) throws Exception {
+
+  @Test
+  public void testFastReplayV1() throws Exception {
+    doTestRestart(true, true, true);
+  }
+
+  @Test
+  public void testFastReplayV2() throws Exception {
+    doTestRestart(false, true, true);
+  }
+  public void doTestRestart(boolean useLogReplayV1,
+          boolean forceCheckpoint, boolean deleteCheckpoint) throws Exception {
     Map<String, String> overrides = Maps.newHashMap();
     overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1,
             String.valueOf(useLogReplayV1));
+    overrides.put(
+            FileChannelConfiguration.USE_FAST_REPLAY,
+            String.valueOf(deleteCheckpoint));
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());
@@ -243,7 +257,14 @@ public class TestFileChannel {
       Assert.assertEquals("Cannot acquire capacity. [channel="
           +channel.getName()+"]", e.getMessage());
     }
+    if (forceCheckpoint) {
+      forceCheckpoint(channel);
+    }
     channel.stop();
+    if(deleteCheckpoint) {
+      File checkpoint = new File(checkpointDir, "checkpoint");
+      checkpoint.delete();
+    }
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());

Reply via email to