Updated Branches: refs/heads/flume-1.3.0 a221a8e99 -> 892521fd5
FLUME-1606. Rollbacks of Put transactions does not clear the transaction from inflight puts. (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/892521fd Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/892521fd Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/892521fd Branch: refs/heads/flume-1.3.0 Commit: 892521fd5b216d9691cec235fa12822e9dfa7e87 Parents: a221a8e Author: Mike Percy <[email protected]> Authored: Mon Sep 24 22:51:04 2012 -0700 Committer: Mike Percy <[email protected]> Committed: Mon Sep 24 22:52:30 2012 -0700 ---------------------------------------------------------------------- .../org/apache/flume/channel/file/FileChannel.java | 4 +- .../apache/flume/channel/file/FlumeEventQueue.java | 7 ++- .../apache/flume/channel/file/TestFileChannel.java | 64 ++++++++++++++- 3 files changed, 71 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/892521fd/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index ca7db70..bdc9f04 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -387,7 +387,7 @@ public class FileChannel extends BasicChannelSemantics { // this does not need to be in the critical section as it does not // modify the structure of the log or queue. if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) { - throw new ChannelException("The channel has reached it's capacity. " + throw new ChannelException("The channel has reached it's capacity. " + "This might be the result of a sink on the channel having too " + "low of batch size, a downstream system running slower than " + "normal, or that the channel capacity is just too low. " @@ -537,11 +537,11 @@ public class FileChannel extends BasicChannelSemantics { "Queue add failed, this shouldn't be able to happen " + channelNameDescriptor); } - queue.completeTransaction(transactionID); } } putList.clear(); takeList.clear(); + queue.completeTransaction(transactionID); channelCounter.setChannelSize(queue.getSize()); } catch (IOException e) { throw new ChannelException("Commit failed due to IO error " http://git-wip-us.apache.org/repos/asf/flume/blob/892521fd/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java index a8df042..36553c5 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java @@ -331,7 +331,7 @@ final class FlumeEventQueue { * None of the methods are thread safe, and should be called from thread * safe methods only. */ - private class InflightEventWrapper { + class InflightEventWrapper { private SetMultimap<Long, Long> inflightEvents = HashMultimap.create(); private RandomAccessFile file; private volatile java.nio.channels.FileChannel fileChannel; @@ -516,5 +516,10 @@ final class FlumeEventQueue { public Collection<Integer> getFileIDs(){ return inflightFileIDs.values(); } + + //Needed for testing. + public Collection<Long> getInFlightPointers() { + return inflightEvents.values(); + } } } http://git-wip-us.apache.org/repos/asf/flume/blob/892521fd/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java index 41b1fbb..87a0a3f 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java @@ -19,6 +19,7 @@ package org.apache.flume.channel.file; import static org.apache.flume.channel.file.TestUtils.*; +import static org.fest.reflect.core.Reflection.*; import java.io.IOException; import java.util.ArrayList; @@ -49,6 +50,9 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.flume.channel.file.FileChannel.FileBackedTransaction; +import org.apache.flume.channel.file.FlumeEventQueue.InflightEventWrapper; +import org.apache.flume.event.EventBuilder; public class TestFileChannel extends TestFileChannelBase { @@ -171,7 +175,7 @@ public class TestFileChannel extends TestFileChannelBase { in.addAll(putEvents(channel, "reconfig", 1, 1)); } } catch (ChannelException e) { - Assert.assertEquals("The channel has reached it's capacity. " + Assert.assertEquals("The channel has reached it's capacity. " + "This might be the result of a sink on the channel having too " + "low of batch size, a downstream system running slower than " + "normal, or that the channel capacity is just too low. [channel=" @@ -476,4 +480,62 @@ public class TestFileChannel extends TestFileChannelBase { }).get(); Assert.assertEquals(15, takenEvents.size()); } + + // This test will fail without FLUME-1606. + @Test + public void testRollbackIncompleteTransaction() throws Exception { + Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, + String.valueOf(Integer.MAX_VALUE)); + final FileChannel channel = createFileChannel(overrides); + channel.start(); + FileBackedTransaction tx = (FileBackedTransaction) channel.getTransaction(); + + InflightEventWrapper inflightPuts = + field("inflightPuts").ofType(InflightEventWrapper.class).in( + field("queue").ofType(FlumeEventQueue.class).in(tx).get()).get(); + + tx.begin(); + + for (int i = 0; i < 100; i++) { + channel.put(EventBuilder.withBody("TestEvent".getBytes())); + } + + Assert.assertFalse(inflightPuts.getFileIDs().isEmpty()); + Assert.assertFalse(inflightPuts.getInFlightPointers().isEmpty()); + + tx.rollback(); + tx.close(); + + Assert.assertTrue(inflightPuts.getFileIDs().isEmpty()); + Assert.assertTrue(inflightPuts.getInFlightPointers().isEmpty()); + Assert.assertTrue(channel.getDepth() == 0); + + Set<String> in = putEvents(channel, "testing-rollbacks", 100, 100); + + tx = (FileBackedTransaction) channel.getTransaction(); + + InflightEventWrapper inflightTakes = + field("inflightTakes").ofType(InflightEventWrapper.class).in( + field("queue").ofType(FlumeEventQueue.class).in(tx).get()).get(); + + tx.begin(); + + for (int i = 0; i < 100; i++) { + channel.take(); + } + + Assert.assertFalse(inflightTakes.getFileIDs().isEmpty()); + Assert.assertFalse(inflightTakes.getInFlightPointers().isEmpty()); + + tx.rollback(); + tx.close(); + + + Assert.assertTrue(inflightTakes.getFileIDs().isEmpty()); + Assert.assertTrue(inflightTakes.getInFlightPointers().isEmpty()); + Assert.assertTrue(channel.getDepth() == in.size()); + + } + }
