Updated Branches: refs/heads/trunk d76118d72 -> 6373032a6
FLUME-2155. Index the Flume Event Queue during replay to improve replay time. (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6373032a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6373032a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6373032a Branch: refs/heads/trunk Commit: 6373032a620bdc687b6d03b12726713d08c71a10 Parents: d76118d Author: Hari Shreedharan <hshreedha...@apache.org> Authored: Fri Dec 13 12:35:43 2013 -0800 Committer: Hari Shreedharan <hshreedha...@apache.org> Committed: Fri Dec 13 12:35:43 2013 -0800 ---------------------------------------------------------------------- flume-ng-channels/flume-file-channel/pom.xml | 5 + .../flume/channel/file/CheckpointRebuilder.java | 4 +- .../file/EventQueueBackingStoreFile.java | 6 +- .../apache/flume/channel/file/FileChannel.java | 1 - .../flume/channel/file/FlumeEventQueue.java | 106 +++++++++++++++++-- .../java/org/apache/flume/channel/file/Log.java | 9 +- .../org/apache/flume/channel/file/LogFile.java | 1 - .../flume/channel/file/ReplayHandler.java | 18 ++-- .../flume/channel/file/Serialization.java | 2 +- .../flume/channel/file/TestCheckpoint.java | 8 +- .../channel/file/TestCheckpointRebuilder.java | 3 +- .../file/TestEventQueueBackingStoreFactory.java | 4 +- .../flume/channel/file/TestFlumeEventQueue.java | 103 ++++++++++++++---- pom.xml | 6 ++ 14 files changed, 221 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/pom.xml b/flume-ng-channels/flume-file-channel/pom.xml index e055d8a..eacd329 100644 --- a/flume-ng-channels/flume-file-channel/pom.xml +++ b/flume-ng-channels/flume-file-channel/pom.xml @@ -102,6 +102,11 @@ <scope>compile</scope> </dependency> + <dependency> + <groupId>org.mapdb</groupId> + <artifactId>mapdb</artifactId> + </dependency> + </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java index 7883d0e..4388181 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java @@ -27,7 +27,6 @@ import com.google.common.collect.Sets; import java.io.EOFException; import java.io.File; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Set; import org.apache.commons.cli.CommandLine; @@ -251,7 +250,8 @@ public class CheckpointRebuilder { capacity, "channel"); FlumeEventQueue queue = new FlumeEventQueue(backingStore, new File(checkpointDir, "inflighttakes"), - new File(checkpointDir, "inflightputs")); + new File(checkpointDir, "inflightputs"), + new File(checkpointDir, Log.QUEUE_SET)); CheckpointRebuilder rebuilder = new CheckpointRebuilder(logFiles, queue); if(rebuilder.rebuild()) { rebuilder.writeCheckpoint(); http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java index 2366cbc..8a9fdae 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java @@ -166,7 +166,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { "from the checkpoint directory. Cannot complete backup of the " + "checkpoint."); for (File origFile : checkpointFiles) { - if(origFile.getName().equals(Log.FILE_LOCK)) { + if(Log.EXCLUDES.contains(origFile.getName())) { continue; } Serialization.copyFile(origFile, new File(backupDirectory, @@ -399,6 +399,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { File file = new File(args[0]); File inflightTakesFile = new File(args[1]); File inflightPutsFile = new File(args[2]); + File queueSetDir = new File(args[3]); if (!file.exists()) { throw new IOException("File " + file + " does not exist"); } @@ -421,7 +422,8 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { + fileID + ", offset = " + offset); } FlumeEventQueue queue = - new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile); + new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile, + queueSetDir); SetMultimap<Long, Long> putMap = queue.deserializeInflightPuts(); System.out.println("Inflight Puts:"); http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index 36f150b..2cd7f03 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -100,7 +100,6 @@ public class FileChannel extends BasicChannelSemantics { private String encryptionActiveKey; private String encryptionCipherProvider; private boolean useDualCheckpoints; - private boolean isTest = false; @Override public synchronized void setName(String name) { http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java index ac03fb4..7888b41 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java @@ -30,11 +30,15 @@ import java.util.Collection; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.Future; + +import org.apache.commons.io.FileUtils; import org.apache.commons.lang.ArrayUtils; +import org.mapdb.DB; +import org.mapdb.DBMaker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; import com.google.common.collect.SetMultimap; @@ -56,16 +60,26 @@ final class FlumeEventQueue { private final String channelNameDescriptor; private final InflightEventWrapper inflightTakes; private final InflightEventWrapper inflightPuts; + private long searchTime = 0; + private long searchCount = 0; + private long copyTime = 0; + private long copyCount = 0; + private DB db; + private Set<Long> queueSet; /** * @param capacity max event capacity of queue * @throws IOException */ FlumeEventQueue(EventQueueBackingStore backingStore, File inflightTakesFile, - File inflightPutsFile) throws Exception { + File inflightPutsFile, File queueSetDBDir) throws Exception { Preconditions.checkArgument(backingStore.getCapacity() > 0, "Capacity must be greater than zero"); + Preconditions.checkNotNull(backingStore, "backingStore"); this.channelNameDescriptor = "[channel=" + backingStore.getName() + "]"; + Preconditions.checkNotNull(inflightTakesFile, "inflightTakesFile"); + Preconditions.checkNotNull(inflightPutsFile, "inflightPutsFile"); + Preconditions.checkNotNull(queueSetDBDir, "queueSetDBDir"); this.backingStore = backingStore; try { inflightPuts = new InflightEventWrapper(inflightPutsFile); @@ -74,6 +88,32 @@ final class FlumeEventQueue { LOG.error("Could not read checkpoint.", e); throw e; } + if(queueSetDBDir.isDirectory()) { + FileUtils.deleteDirectory(queueSetDBDir); + } else if(queueSetDBDir.isFile() && !queueSetDBDir.delete()) { + throw new IOException("QueueSetDir " + queueSetDBDir + " is a file and" + + " could not be deleted"); + } + if(!queueSetDBDir.mkdirs()) { + throw new IllegalStateException("Could not create QueueSet Dir " + + queueSetDBDir); + } + File dbFile = new File(queueSetDBDir, "db"); + db = DBMaker.newFileDB(dbFile) + .closeOnJvmShutdown() + .transactionDisable() + .syncOnCommitDisable() + .deleteFilesAfterClose() + .cacheDisable() + .randomAccessFileEnableIfNeeded() + .make(); + queueSet = db.createTreeSet("QueueSet").make(); + long start = System.currentTimeMillis(); + for (int i = 0; i < backingStore.getSize(); i++) { + queueSet.add(get(i)); + } + LOG.info("QueueSet population inserting " + backingStore.getSize() + + " took " + (System.currentTimeMillis() - start)); } SetMultimap<Long, Long> deserializeInflightPuts() @@ -182,8 +222,10 @@ final class FlumeEventQueue { } /** - * Remove FlumeEventPointer from queue, will normally - * only be used when recovering from a crash + * Remove FlumeEventPointer from queue, will + * only be used when recovering from a crash. It is not + * legal to call this method after replayComplete has been + * called. * @param FlumeEventPointer to be removed * @return true if the FlumeEventPointer was found * and removed @@ -191,14 +233,25 @@ final class FlumeEventQueue { synchronized boolean remove(FlumeEventPointer e) { long value = e.toLong(); Preconditions.checkArgument(value != EMPTY); + if (queueSet == null) { + throw new IllegalStateException("QueueSet is null, thus replayComplete" + + " has been called which is illegal"); + } + if (!queueSet.contains(value)) { + return false; + } + searchCount++; + long start = System.currentTimeMillis(); for (int i = 0; i < backingStore.getSize(); i++) { if(get(i) == value) { remove(i, 0); FlumeEventPointer ptr = FlumeEventPointer.fromLong(value); backingStore.decrementFileID(ptr.getFileID()); + searchTime += System.currentTimeMillis() - start; return true; } } + searchTime += System.currentTimeMillis() - start; return false; } /** @@ -261,6 +314,9 @@ final class FlumeEventQueue { } } set(index, value); + if (queueSet != null) { + queueSet.add(value); + } return true; } @@ -279,7 +335,12 @@ final class FlumeEventQueue { throw new IndexOutOfBoundsException("index = " + index + ", queueSize " + backingStore.getSize() +" " + channelNameDescriptor); } + copyCount++; + long start = System.currentTimeMillis(); long value = get(index); + if (queueSet != null) { + queueSet.remove(value); + } //if txn id = 0, we are recovering from a crash. if(transactionID != 0) { inflightTakes.addEvent(transactionID, value); @@ -304,10 +365,10 @@ final class FlumeEventQueue { } } backingStore.setSize(backingStore.getSize() - 1); + copyTime += System.currentTimeMillis() - start; return value; } - protected synchronized int getSize() { return backingStore.getSize() + inflightTakes.getSize(); } @@ -321,6 +382,13 @@ final class FlumeEventQueue { synchronized void close() throws IOException { try { + if (db != null) { + db.close(); + } + } catch(Exception ex) { + LOG.warn("Error closing db", ex); + } + try { backingStore.close(); inflightPuts.close(); inflightTakes.close(); @@ -328,6 +396,33 @@ final class FlumeEventQueue { LOG.warn("Error closing backing store", e); } } + + /** + * Called when ReplayHandler has completed and thus remove(FlumeEventPointer) + * will no longer be called. + */ + synchronized void replayComplete() { + String msg = "Search Count = " + searchCount + ", Search Time = " + + searchTime + ", Copy Count = " + copyCount + ", Copy Time = " + + copyTime; + LOG.info(msg); + if(db != null) { + db.close(); + } + queueSet = null; + db = null; + } + + @VisibleForTesting + long getSearchCount() { + return searchCount; + } + + @VisibleForTesting + long getCopyCount() { + return copyCount; + } + /** * A representation of in flight events which have not yet been committed. * None of the methods are thread safe, and should be called from thread @@ -340,7 +435,6 @@ final class FlumeEventQueue { private volatile RandomAccessFile file; private volatile java.nio.channels.FileChannel fileChannel; private final MessageDigest digest; - private volatile Future<?> future; private final File inflightEventsFile; private volatile boolean syncRequired = false; private SetMultimap<Long, Integer> inflightFileIDs = HashMultimap.create(); http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 8a8cb7f..70106cb 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -78,6 +78,7 @@ public class Log { private static final Logger LOGGER = LoggerFactory.getLogger(Log.class); private static final int MIN_NUM_LOGS = 2; public static final String FILE_LOCK = "in_use.lock"; + public static final String QUEUE_SET = "queueset"; // for reader private final Map<Integer, LogFile.RandomReader> idLogFileMap = Collections .synchronizedMap(new HashMap<Integer, LogFile.RandomReader>()); @@ -103,7 +104,8 @@ public class Log { /** * Set of files that should be excluded from backup and restores. */ - public static final Set<String> EXCLUDES = Sets.newHashSet(FILE_LOCK); + public static final Set<String> EXCLUDES = Sets.newHashSet(FILE_LOCK, + QUEUE_SET); /** * Shared lock */ @@ -405,6 +407,7 @@ public class Log { } File inflightTakesFile = new File(checkpointDir, "inflighttakes"); File inflightPutsFile = new File(checkpointDir, "inflightputs"); + File queueSetDir = new File(checkpointDir, QUEUE_SET); EventQueueBackingStore backingStore = null; @@ -414,7 +417,7 @@ public class Log { backupCheckpointDir, queueCapacity, channelNameDescriptor, true, this.useDualCheckpoints); queue = new FlumeEventQueue(backingStore, inflightTakesFile, - inflightPutsFile); + inflightPutsFile, queueSetDir); LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified()) + ", queue depth = " + queue.getSize()); @@ -450,7 +453,7 @@ public class Log { backupCheckpointDir, queueCapacity, channelNameDescriptor, true, useDualCheckpoints); queue = new FlumeEventQueue(backingStore, inflightTakesFile, - inflightPutsFile); + inflightPutsFile, queueSetDir); // If the checkpoint was deleted due to BadCheckpointException, then // trigger fast replay if the channel is configured to. shouldFastReplay = this.useFastReplay; http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java index 62f68c6..26a24b1 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import org.apache.flume.ChannelException; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.apache.flume.channel.file.encryption.CipherProvider; http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java index c8f5fdd..e668c2e 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java @@ -26,7 +26,6 @@ import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; import org.apache.commons.collections.MultiMap; import org.apache.commons.collections.map.MultiValueMap; -import org.apache.flume.ChannelException; import org.apache.flume.channel.file.encryption.KeyProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -290,7 +289,9 @@ class ReplayHandler { record.getLogWriteOrderID()); readCount++; if(readCount % 10000 == 0 && readCount > 0) { - LOG.info("Read " + readCount + " records"); + LOG.info("read: " + readCount + ", put: " + putCount + ", take: " + + takeCount + ", rollback: " + rollbackCount + ", commit: " + + commitCount + ", skip: " + skipCount + ", eventCount:" + count); } if (record.getLogWriteOrderID() > lastCheckpoint) { if (type == TransactionEventRecord.Type.PUT.get()) { @@ -339,6 +340,7 @@ class ReplayHandler { LOG.info("read: " + readCount + ", put: " + putCount + ", take: " + takeCount + ", rollback: " + rollbackCount + ", commit: " + commitCount + ", skip: " + skipCount + ", eventCount:" + count); + queue.replayComplete(); } finally { TransactionIDOracle.setSeed(transactionIDSeed); WriteOrderOracle.setSeed(writeOrderIDSeed); @@ -363,15 +365,9 @@ class ReplayHandler { count += uncommittedTakes; int pendingTakesSize = pendingTakes.size(); if (pendingTakesSize > 0) { - String msg = "Pending takes " + pendingTakesSize - + " exist after the end of replay"; - if (LOG.isDebugEnabled()) { - for (Long pointer : pendingTakes) { - LOG.debug("Pending take " + FlumeEventPointer.fromLong(pointer)); - } - } else { - LOG.error(msg + ". Duplicate messages will exist in destination."); - } + LOG.info("Pending takes " + pendingTakesSize + " exist after the" + + " end of replay. Duplicate messages will exist in" + + " destination."); } } private LogRecord next() throws IOException, CorruptEventException { http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java index f8160d9..d55660d 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java @@ -98,7 +98,7 @@ public class Serialization { builder = new StringBuilder("Deleted the following files: "); } if(excludes == null) { - excludes = Collections.EMPTY_SET; + excludes = Collections.emptySet(); } for (File file : files) { if(excludes.contains(file.getName())) { http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java index 1e0230d..c1de12e 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java @@ -32,11 +32,13 @@ public class TestCheckpoint { File file; File inflightPuts; File inflightTakes; + File queueSet; @Before public void setup() throws IOException { file = File.createTempFile("Checkpoint", ""); inflightPuts = File.createTempFile("inflightPuts", ""); inflightTakes = File.createTempFile("inflightTakes", ""); + queueSet = File.createTempFile("queueset", ""); Assert.assertTrue(file.isFile()); Assert.assertTrue(file.canWrite()); } @@ -50,14 +52,14 @@ public class TestCheckpoint { new EventQueueBackingStoreFileV2(file, 1, "test"); FlumeEventPointer ptrIn = new FlumeEventPointer(10, 20); FlumeEventQueue queueIn = new FlumeEventQueue(backingStore, - inflightTakes, inflightPuts); + inflightTakes, inflightPuts, queueSet); queueIn.addHead(ptrIn); FlumeEventQueue queueOut = new FlumeEventQueue(backingStore, - inflightTakes, inflightPuts); + inflightTakes, inflightPuts, queueSet); Assert.assertEquals(0, queueOut.getLogWriteOrderID()); queueIn.checkpoint(false); FlumeEventQueue queueOut2 = new FlumeEventQueue(backingStore, - inflightTakes, inflightPuts); + inflightTakes, inflightPuts, queueSet); FlumeEventPointer ptrOut = queueOut2.removeHead(0L); Assert.assertEquals(ptrIn, ptrOut); Assert.assertTrue(queueOut2.getLogWriteOrderID() > 0); http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java index 536af54..621d445 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java @@ -63,6 +63,7 @@ public class TestCheckpointRebuilder extends TestFileChannelBase { File metaDataFile = Serialization.getMetaDataFile(checkpointFile); File inflightTakesFile = new File(checkpointDir, "inflighttakes"); File inflightPutsFile = new File(checkpointDir, "inflightputs"); + File queueSetDir = new File(checkpointDir, "queueset"); Assert.assertTrue(checkpointFile.delete()); Assert.assertTrue(metaDataFile.delete()); Assert.assertTrue(inflightTakesFile.delete()); @@ -71,7 +72,7 @@ public class TestCheckpointRebuilder extends TestFileChannelBase { EventQueueBackingStoreFactory.get(checkpointFile, 50, "test"); FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakesFile, - inflightPutsFile); + inflightPutsFile, queueSetDir); CheckpointRebuilder checkpointRebuilder = new CheckpointRebuilder(getAllLogs(dataDirs), queue); Assert.assertTrue(checkpointRebuilder.rebuild()); http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java index dfb3bf9..52c706d 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java @@ -58,12 +58,14 @@ public class TestEventQueueBackingStoreFactory { File checkpoint; File inflightTakes; File inflightPuts; + File queueSetDir; @Before public void setup() throws IOException { baseDir = Files.createTempDir(); checkpoint = new File(baseDir, "checkpoint"); inflightTakes = new File(baseDir, "takes"); inflightPuts = new File(baseDir, "puts"); + queueSetDir = new File(baseDir, "queueset"); TestUtils.copyDecompressed("fileformat-v2-checkpoint.gz", checkpoint); } @@ -275,7 +277,7 @@ public class TestEventQueueBackingStoreFactory { List<Long> expectedPointers) throws Exception { FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakes, - inflightPuts); + inflightPuts, queueSetDir); List<Long> actualPointers = Lists.newArrayList(); FlumeEventPointer ptr; while((ptr = queue.removeHead(0L)) != null) { http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java index 203cbf2..1adb21a 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java @@ -44,6 +44,7 @@ import java.io.RandomAccessFile; public class TestFlumeEventQueue { FlumeEventPointer pointer1 = new FlumeEventPointer(1, 1); FlumeEventPointer pointer2 = new FlumeEventPointer(2, 2); + FlumeEventPointer pointer3 = new FlumeEventPointer(3, 3); FlumeEventQueue queue; EventQueueBackingStoreSupplier backingStoreSupplier; EventQueueBackingStore backingStore; @@ -53,11 +54,13 @@ public class TestFlumeEventQueue { File checkpoint; File inflightTakes; File inflightPuts; + File queueSetDir; EventQueueBackingStoreSupplier() { baseDir = Files.createTempDir(); checkpoint = new File(baseDir, "checkpoint"); inflightTakes = new File(baseDir, "inflightputs"); inflightPuts = new File(baseDir, "inflighttakes"); + queueSetDir = new File(baseDir, "queueset"); } File getCheckpoint() { return checkpoint; @@ -68,6 +71,9 @@ public class TestFlumeEventQueue { File getInflightTakes() { return inflightTakes; } + File getQueueSetDir() { + return queueSetDir; + } void delete() { FileUtils.deleteQuietly(baseDir); } @@ -120,7 +126,8 @@ public class TestFlumeEventQueue { backingStore = new EventQueueBackingStoreFileV2(checkpoint, 1, "test"); queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertFalse(queue.addTail(pointer2)); } @@ -132,7 +139,8 @@ public class TestFlumeEventQueue { backingStore = new EventQueueBackingStoreFileV2(checkpoint, 0, "test"); queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); } @Test(expected=IllegalArgumentException.class) public void testInvalidCapacityNegative() throws Exception { @@ -142,20 +150,23 @@ public class TestFlumeEventQueue { backingStore = new EventQueueBackingStoreFileV2(checkpoint, -1, "test"); queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); } @Test public void testQueueIsEmptyAfterCreation() throws Exception { queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertNull(queue.removeHead(0L)); } @Test public void addTail1() throws Exception { queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertEquals(pointer1, queue.removeHead(0)); Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs()); @@ -164,7 +175,8 @@ public class TestFlumeEventQueue { public void addTail2() throws Exception { queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertTrue(queue.addTail(pointer2)); Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs()); @@ -175,7 +187,8 @@ public class TestFlumeEventQueue { public void addTailLarge() throws Exception { queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); int size = 500; Set<Integer> fileIDs = Sets.newHashSet(); for (int i = 1; i <= size; i++) { @@ -194,7 +207,8 @@ public class TestFlumeEventQueue { public void addHead1() throws Exception { queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs()); Assert.assertEquals(pointer1, queue.removeHead(0)); @@ -204,7 +218,9 @@ public class TestFlumeEventQueue { public void addHead2() throws Exception { queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); + queue.replayComplete(); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertTrue(queue.addHead(pointer2)); Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs()); @@ -215,7 +231,9 @@ public class TestFlumeEventQueue { public void addHeadLarge() throws Exception { queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); + queue.replayComplete(); int size = 500; Set<Integer> fileIDs = Sets.newHashSet(); for (int i = 1; i <= size; i++) { @@ -234,10 +252,12 @@ public class TestFlumeEventQueue { public void addTailRemove1() throws Exception { queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs()); Assert.assertTrue(queue.remove(pointer1)); + queue.replayComplete(); Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs()); Assert.assertNull(queue.removeHead(0)); Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs()); @@ -247,10 +267,12 @@ public class TestFlumeEventQueue { public void addTailRemove2() throws Exception { queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertTrue(queue.addTail(pointer2)); Assert.assertTrue(queue.remove(pointer1)); + queue.replayComplete(); Assert.assertEquals(pointer2, queue.removeHead(0)); } @@ -258,7 +280,8 @@ public class TestFlumeEventQueue { public void addHeadRemove1() throws Exception { queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); queue.addHead(pointer1); Assert.assertTrue(queue.remove(pointer1)); Assert.assertNull(queue.removeHead(0)); @@ -267,17 +290,43 @@ public class TestFlumeEventQueue { public void addHeadRemove2() throws Exception { queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertTrue(queue.addHead(pointer2)); Assert.assertTrue(queue.remove(pointer1)); + queue.replayComplete(); Assert.assertEquals(pointer2, queue.removeHead(0)); } @Test + public void testUnknownPointerDoesNotCauseSearch() throws Exception { + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); + Assert.assertTrue(queue.addHead(pointer1)); + Assert.assertTrue(queue.addHead(pointer2)); + Assert.assertFalse(queue.remove(pointer3)); // does search + Assert.assertTrue(queue.remove(pointer1)); + Assert.assertTrue(queue.remove(pointer2)); + queue.replayComplete(); + Assert.assertEquals(2, queue.getSearchCount()); + } + @Test(expected=IllegalStateException.class) + public void testRemoveAfterReplayComplete() throws Exception { + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); + queue.replayComplete(); + queue.remove(pointer1); + } + @Test public void testWrappingCorrectly() throws Exception { queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); int size = Integer.MAX_VALUE; for (int i = 1; i <= size; i++) { if(!queue.addHead(new FlumeEventPointer(i, i))) { @@ -299,7 +348,8 @@ public class TestFlumeEventQueue { public void testInflightPuts() throws Exception{ queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1); long txnID2 = txnID1 + 1; queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1); @@ -309,7 +359,8 @@ public class TestFlumeEventQueue { TimeUnit.SECONDS.sleep(3L); queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightPuts(); Assert.assertTrue(deserializedMap.get( txnID1).contains(new FlumeEventPointer(1, 1).toLong())); @@ -323,7 +374,8 @@ public class TestFlumeEventQueue { public void testInflightTakes() throws Exception { queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1); long txnID2 = txnID1 + 1; queue.addTail(new FlumeEventPointer(1, 1)); @@ -336,7 +388,8 @@ public class TestFlumeEventQueue { TimeUnit.SECONDS.sleep(3L); queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightTakes(); Assert.assertTrue(deserializedMap.get( txnID1).contains(new FlumeEventPointer(1, 1).toLong())); @@ -353,7 +406,8 @@ public class TestFlumeEventQueue { try { queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1); long txnID2 = txnID1 + 1; queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1); @@ -367,7 +421,8 @@ public class TestFlumeEventQueue { inflight.writeInt(new Random().nextInt()); queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightPuts(); Assert.assertTrue(deserializedMap.get( txnID1).contains(new FlumeEventPointer(1, 1).toLong())); @@ -386,7 +441,8 @@ public class TestFlumeEventQueue { try { queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1); long txnID2 = txnID1 + 1; queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1); @@ -400,7 +456,8 @@ public class TestFlumeEventQueue { inflight.writeInt(new Random().nextInt()); queue = new FlumeEventQueue(backingStore, backingStoreSupplier.getInflightTakes(), - backingStoreSupplier.getInflightPuts()); + backingStoreSupplier.getInflightPuts(), + backingStoreSupplier.getQueueSetDir()); SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightTakes(); Assert.assertTrue(deserializedMap.get( txnID1).contains(new FlumeEventPointer(1, 1).toLong())); http://git-wip-us.apache.org/repos/asf/flume/blob/6373032a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 267925f..77b7f22 100644 --- a/pom.xml +++ b/pom.xml @@ -774,6 +774,12 @@ limitations under the License. <version>4.2.1</version> </dependency> + <dependency> + <groupId>org.mapdb</groupId> + <artifactId>mapdb</artifactId> + <version>0.9.7</version> + </dependency> + <!-- Gson: Java to Json conversion --> <dependency> <groupId>com.google.code.gson</groupId>