Updated Branches: refs/heads/trunk f269c69bc -> f26c79f6d
FLUME-1543. TestFileChannel should be factored into many tests. (Brock Noland via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f26c79f6 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f26c79f6 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f26c79f6 Branch: refs/heads/trunk Commit: f26c79f6d32b7893e0d60326e6992faa8ead0785 Parents: f269c69 Author: Mike Percy <[email protected]> Authored: Sat Sep 8 09:10:17 2012 -0700 Committer: Mike Percy <[email protected]> Committed: Sat Sep 8 09:10:17 2012 -0700 ---------------------------------------------------------------------- .../apache/flume/channel/file/TestFileChannel.java | 296 +-------------- .../flume/channel/file/TestFileChannelBase.java | 82 ++++ .../file/TestFileChannelFormatRegression.java | 113 ++++++ .../flume/channel/file/TestFileChannelRestart.java | 153 ++++++++ .../channel/file/TestFileChannelRollback.java | 139 +++++++ 5 files changed, 490 insertions(+), 293 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/f26c79f6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java index e2abc27..7c1aaab 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java @@ -20,12 +20,9 @@ package org.apache.flume.channel.file; import static org.apache.flume.channel.file.TestUtils.*; -import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -38,14 +35,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.commons.io.FileUtils; import org.apache.flume.ChannelException; -import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurables; -import org.apache.flume.event.EventBuilder; -import org.apache.flume.sink.LoggerSink; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -53,56 +46,22 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.io.Files; -public class TestFileChannel { +public class TestFileChannel extends TestFileChannelBase { private static final Logger LOG = LoggerFactory .getLogger(TestFileChannel.class); - private FileChannel channel; - private File baseDir; - private File checkpointDir; - private File[] dataDirs; - private String dataDir; @Before public void setup() { - baseDir = Files.createTempDir(); - checkpointDir = new File(baseDir, "chkpt"); - Assert.assertTrue(checkpointDir.mkdirs() || checkpointDir.isDirectory()); - dataDirs = new File[3]; - dataDir = ""; - for (int i = 0; i < dataDirs.length; i++) { - dataDirs[i] = new File(baseDir, "data" + (i+1)); - Assert.assertTrue(dataDirs[i].mkdirs() || dataDirs[i].isDirectory()); - dataDir += dataDirs[i].getAbsolutePath() + ","; - } - dataDir = dataDir.substring(0, dataDir.length() - 1); - channel = createFileChannel(); + super.setup(); } @After public void teardown() { - if(channel != null && channel.isOpen()) { - channel.stop(); - } - FileUtils.deleteQuietly(baseDir); - } - private Context createContext() { - return createContext(new HashMap<String, String>()); - } - private Context createContext(Map<String, String> overrides) { - return TestUtils.createFileChannelContext(checkpointDir.getAbsolutePath(), - dataDir, overrides); - } - private FileChannel createFileChannel() { - return createFileChannel(new HashMap<String, String>()); - } - private FileChannel createFileChannel(Map<String, String> overrides) { - return TestUtils.createFileChannel(checkpointDir.getAbsolutePath(), dataDir, overrides); + super.teardown(); } @Test public void testFailAfterTakeBeforeCommit() throws Throwable { @@ -203,108 +162,6 @@ public class TestFileChannel { } @Test - public void testRestartLogReplayV1() throws Exception { - doTestRestart(true, false, false, false); - } - @Test - public void testRestartLogReplayV2() throws Exception { - doTestRestart(false, false, false, false); - } - - @Test - public void testFastReplayV1() throws Exception { - doTestRestart(true, true, true, true); - } - - @Test - public void testFastReplayV2() throws Exception { - doTestRestart(false, true, true, true); - } - - @Test - public void testNormalReplayV1() throws Exception { - doTestRestart(true, true, true, false); - } - - @Test - public void testNormalReplayV2() throws Exception { - doTestRestart(false, true, true, false); - } - - public void doTestRestart(boolean useLogReplayV1, - boolean forceCheckpoint, boolean deleteCheckpoint, - boolean useFastReplay) throws Exception { - Map<String, String> overrides = Maps.newHashMap(); - overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1, - String.valueOf(useLogReplayV1)); - overrides.put( - FileChannelConfiguration.USE_FAST_REPLAY, - String.valueOf(useFastReplay)); - channel = createFileChannel(overrides); - channel.start(); - Assert.assertTrue(channel.isOpen()); - Set<String> in = Sets.newHashSet(); - try { - while(true) { - in.addAll(putEvents(channel, "restart", 1, 1)); - } - } catch (ChannelException e) { - Assert.assertEquals("Cannot acquire capacity. [channel=" - +channel.getName()+"]", e.getMessage()); - } - if (forceCheckpoint) { - forceCheckpoint(channel); - } - channel.stop(); - if(deleteCheckpoint) { - File checkpoint = new File(checkpointDir, "checkpoint"); - Assert.assertTrue(checkpoint.delete()); - File checkpointMetaData = Serialization.getMetaDataFile(checkpoint); - Assert.assertTrue(checkpointMetaData.delete()); - } - channel = createFileChannel(overrides); - channel.start(); - Assert.assertTrue(channel.isOpen()); - Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE); - compareInputAndOut(in, out); - } - @Test - public void testRestartFailsWhenMetaDataExistsButCheckpointDoesNot() - throws Exception { - Map<String, String> overrides = Maps.newHashMap(); - channel = createFileChannel(overrides); - channel.start(); - Assert.assertTrue(channel.isOpen()); - Assert.assertEquals(1, putEvents(channel, "restart", 1, 1).size()); - forceCheckpoint(channel); - channel.stop(); - File checkpoint = new File(checkpointDir, "checkpoint"); - Assert.assertTrue(checkpoint.delete()); - File checkpointMetaData = Serialization.getMetaDataFile(checkpoint); - Assert.assertTrue(checkpointMetaData.exists()); - channel = createFileChannel(overrides); - channel.start(); - Assert.assertFalse(channel.isOpen()); - } - @Test - public void testRestartFailsWhenCheckpointExistsButMetaDoesNot() - throws Exception { - Map<String, String> overrides = Maps.newHashMap(); - channel = createFileChannel(overrides); - channel.start(); - Assert.assertTrue(channel.isOpen()); - Assert.assertEquals(1, putEvents(channel, "restart", 1, 1).size()); - forceCheckpoint(channel); - channel.stop(); - File checkpoint = new File(checkpointDir, "checkpoint"); - File checkpointMetaData = Serialization.getMetaDataFile(checkpoint); - Assert.assertTrue(checkpointMetaData.delete()); - Assert.assertTrue(checkpoint.exists()); - channel = createFileChannel(overrides); - channel.start(); - Assert.assertFalse(channel.isOpen()); - } - @Test public void testReconfigure() throws Exception { channel.start(); Assert.assertTrue(channel.isOpen()); @@ -335,27 +192,6 @@ public class TestFileChannel { compareInputAndOut(expected, actual); } @Test - public void testRollbackAfterNoPutTake() throws Exception { - channel.start(); - Assert.assertTrue(channel.isOpen()); - Transaction transaction; - transaction = channel.getTransaction(); - transaction.begin(); - transaction.rollback(); - transaction.close(); - - // ensure we can reopen log with no error - channel.stop(); - channel = createFileChannel(); - channel.start(); - Assert.assertTrue(channel.isOpen()); - transaction = channel.getTransaction(); - transaction.begin(); - Assert.assertNull(channel.take()); - transaction.commit(); - transaction.close(); - } - @Test public void testCommitAfterNoPutTake() throws Exception { channel.start(); Assert.assertTrue(channel.isOpen()); @@ -466,72 +302,6 @@ public class TestFileChannel { Assert.assertTrue(channel.isOpen()); } @Test - public void testRollbackSimulatedCrash() throws Exception { - channel.start(); - Assert.assertTrue(channel.isOpen()); - int numEvents = 50; - Set<String> in = putEvents(channel, "rollback", 1, numEvents); - - Transaction transaction; - // put an item we will rollback - transaction = channel.getTransaction(); - transaction.begin(); - channel.put(EventBuilder.withBody("rolled back".getBytes(Charsets.UTF_8))); - transaction.rollback(); - transaction.close(); - - // simulate crash - channel.stop(); - channel = createFileChannel(); - channel.start(); - Assert.assertTrue(channel.isOpen()); - - // we should not get the rolled back item - Set<String> out = takeEvents(channel, 1, numEvents); - compareInputAndOut(in, out); - } - @Test - public void testRollbackSimulatedCrashWithSink() throws Exception { - channel.start(); - Assert.assertTrue(channel.isOpen()); - int numEvents = 100; - - LoggerSink sink = new LoggerSink(); - sink.setChannel(channel); - // sink will leave one item - CountingSinkRunner runner = new CountingSinkRunner(sink, numEvents - 1); - runner.start(); - putEvents(channel, "rollback", 10, numEvents); - - Transaction transaction; - // put an item we will rollback - transaction = channel.getTransaction(); - transaction.begin(); - byte[] bytes = "rolled back".getBytes(Charsets.UTF_8); - channel.put(EventBuilder.withBody(bytes)); - transaction.rollback(); - transaction.close(); - - while(runner.isAlive()) { - Thread.sleep(10L); - } - Assert.assertEquals(numEvents - 1, runner.getCount()); - for(Exception ex : runner.getErrors()) { - LOG.warn("Sink had error", ex); - } - Assert.assertEquals(Collections.EMPTY_LIST, runner.getErrors()); - - // simulate crash - channel.stop(); - channel = createFileChannel(); - channel.start(); - Assert.assertTrue(channel.isOpen()); - Set<String> out = takeEvents(channel, 1, 1); - Assert.assertEquals(1, out.size()); - String s = out.iterator().next(); - Assert.assertTrue(s, s.startsWith("rollback-90-9")); - } - @Test public void testThreaded() throws IOException, InterruptedException { channel.start(); Assert.assertTrue(channel.isOpen()); @@ -612,67 +382,7 @@ public class TestFileChannel { fileChannel.start(); Assert.assertTrue(!fileChannel.isOpen()); } - /** - * This is regression test with files generated by a file channel - * with the FLUME-1432 patch. - */ - @Test - public void testFileFormatV2postFLUME1432() - throws Exception { - TestUtils.copyDecompressed("fileformat-v2-checkpoint.gz", - new File(checkpointDir, "checkpoint")); - for (int i = 0; i < dataDirs.length; i++) { - int fileIndex = i + 1; - TestUtils.copyDecompressed("fileformat-v2-log-"+fileIndex+".gz", - new File(dataDirs[i], "log-" + fileIndex)); - } - Map<String, String> overrides = Maps.newHashMap(); - overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10)); - channel = createFileChannel(overrides); - channel.start(); - Assert.assertTrue(channel.isOpen()); - Set<String> events = takeEvents(channel, 1); - Set<String> expected = new HashSet<String>(); - expected.addAll(Arrays.asList( - (new String[]{ - "2684", "2685", "2686", "2687", "2688", "2689", "2690", "2691" - }))); - compareInputAndOut(expected, events); - } - /** - * This is a regression test with files generated by a file channel - * without the FLUME-1432 patch. - */ - @Test - public void testFileFormatV2PreFLUME1432LogReplayV1() - throws Exception { - doTestFileFormatV2PreFLUME1432(true); - } - @Test - public void testFileFormatV2PreFLUME1432LogReplayV2() - throws Exception { - doTestFileFormatV2PreFLUME1432(false); - } - public void doTestFileFormatV2PreFLUME1432(boolean useLogReplayV1) - throws Exception { - TestUtils.copyDecompressed("fileformat-v2-pre-FLUME-1432-checkpoint.gz", - new File(checkpointDir, "checkpoint")); - for (int i = 0; i < dataDirs.length; i++) { - int fileIndex = i + 1; - TestUtils.copyDecompressed("fileformat-v2-pre-FLUME-1432-log-" + fileIndex - + ".gz", new File(dataDirs[i], "log-" + fileIndex)); - } - Map<String, String> overrides = Maps.newHashMap(); - overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000)); - overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1, - String.valueOf(useLogReplayV1)); - channel = createFileChannel(overrides); - channel.start(); - Assert.assertTrue(channel.isOpen()); - Set<String> events = takeEvents(channel, 1); - Assert.assertEquals(50, events.size()); - } /** * Test contributed by Brock Noland during code review. http://git-wip-us.apache.org/repos/asf/flume/blob/f26c79f6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java new file mode 100644 index 0000000..4655978 --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.io.FileUtils; +import org.apache.flume.Context; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import com.google.common.io.Files; + +public class TestFileChannelBase { + + protected FileChannel channel; + protected File baseDir; + protected File checkpointDir; + protected File[] dataDirs; + protected String dataDir; + + @Before + public void setup() { + baseDir = Files.createTempDir(); + checkpointDir = new File(baseDir, "chkpt"); + Assert.assertTrue(checkpointDir.mkdirs() || checkpointDir.isDirectory()); + dataDirs = new File[3]; + dataDir = ""; + for (int i = 0; i < dataDirs.length; i++) { + dataDirs[i] = new File(baseDir, "data" + (i + 1)); + Assert.assertTrue(dataDirs[i].mkdirs() || dataDirs[i].isDirectory()); + dataDir += dataDirs[i].getAbsolutePath() + ","; + } + dataDir = dataDir.substring(0, dataDir.length() - 1); + channel = createFileChannel(); + } + + @After + public void teardown() { + if (channel != null && channel.isOpen()) { + channel.stop(); + } + FileUtils.deleteQuietly(baseDir); + } + + protected Context createContext() { + return createContext(new HashMap<String, String>()); + } + + protected Context createContext(Map<String, String> overrides) { + return TestUtils.createFileChannelContext(checkpointDir.getAbsolutePath(), + dataDir, overrides); + } + + protected FileChannel createFileChannel() { + return createFileChannel(new HashMap<String, String>()); + } + + protected FileChannel createFileChannel(Map<String, String> overrides) { + return TestUtils.createFileChannel(checkpointDir.getAbsolutePath(), + dataDir, overrides); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/f26c79f6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java new file mode 100644 index 0000000..8fc0faf --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelFormatRegression.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +import static org.apache.flume.channel.file.TestUtils.*; + +import java.io.File; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + + +public class TestFileChannelFormatRegression extends TestFileChannelBase { + protected static final Logger LOG = LoggerFactory + .getLogger(TestFileChannelFormatRegression.class); + + @Before + public void setup() { + super.setup(); + } + + @After + public void teardown() { + super.teardown(); + } + /** + * This is regression test with files generated by a file channel + * with the FLUME-1432 patch. + */ + @Test + public void testFileFormatV2postFLUME1432() + throws Exception { + TestUtils.copyDecompressed("fileformat-v2-checkpoint.gz", + new File(checkpointDir, "checkpoint")); + for (int i = 0; i < dataDirs.length; i++) { + int fileIndex = i + 1; + TestUtils.copyDecompressed("fileformat-v2-log-"+fileIndex+".gz", + new File(dataDirs[i], "log-" + fileIndex)); + } + Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10)); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set<String> events = takeEvents(channel, 1); + Set<String> expected = new HashSet<String>(); + expected.addAll(Arrays.asList( + (new String[]{ + "2684", "2685", "2686", "2687", "2688", "2689", "2690", "2691" + }))); + compareInputAndOut(expected, events); + + } + /** + * This is a regression test with files generated by a file channel + * without the FLUME-1432 patch. + */ + @Test + public void testFileFormatV2PreFLUME1432LogReplayV1() + throws Exception { + doTestFileFormatV2PreFLUME1432(true); + } + @Test + public void testFileFormatV2PreFLUME1432LogReplayV2() + throws Exception { + doTestFileFormatV2PreFLUME1432(false); + } + public void doTestFileFormatV2PreFLUME1432(boolean useLogReplayV1) + throws Exception { + TestUtils.copyDecompressed("fileformat-v2-pre-FLUME-1432-checkpoint.gz", + new File(checkpointDir, "checkpoint")); + for (int i = 0; i < dataDirs.length; i++) { + int fileIndex = i + 1; + TestUtils.copyDecompressed("fileformat-v2-pre-FLUME-1432-log-" + fileIndex + + ".gz", new File(dataDirs[i], "log-" + fileIndex)); + } + Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000)); + overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1, + String.valueOf(useLogReplayV1)); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set<String> events = takeEvents(channel, 1); + Assert.assertEquals(50, events.size()); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/f26c79f6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java new file mode 100644 index 0000000..cf61aed --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +import static org.apache.flume.channel.file.TestUtils.*; + +import java.io.File; +import java.util.Map; +import java.util.Set; + +import org.apache.flume.ChannelException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class TestFileChannelRestart extends TestFileChannelBase { + protected static final Logger LOG = LoggerFactory + .getLogger(TestFileChannelRestart.class); + + @Before + public void setup() { + super.setup(); + } + + @After + public void teardown() { + super.teardown(); + } + @Test + public void testRestartLogReplayV1() throws Exception { + doTestRestart(true, false, false, false); + } + @Test + public void testRestartLogReplayV2() throws Exception { + doTestRestart(false, false, false, false); + } + + @Test + public void testFastReplayV1() throws Exception { + doTestRestart(true, true, true, true); + } + + @Test + public void testFastReplayV2() throws Exception { + doTestRestart(false, true, true, true); + } + + @Test + public void testNormalReplayV1() throws Exception { + doTestRestart(true, true, true, false); + } + + @Test + public void testNormalReplayV2() throws Exception { + doTestRestart(false, true, true, false); + } + + public void doTestRestart(boolean useLogReplayV1, + boolean forceCheckpoint, boolean deleteCheckpoint, + boolean useFastReplay) throws Exception { + Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1, + String.valueOf(useLogReplayV1)); + overrides.put( + FileChannelConfiguration.USE_FAST_REPLAY, + String.valueOf(useFastReplay)); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set<String> in = Sets.newHashSet(); + try { + while(true) { + in.addAll(putEvents(channel, "restart", 1, 1)); + } + } catch (ChannelException e) { + Assert.assertEquals("Cannot acquire capacity. [channel=" + +channel.getName()+"]", e.getMessage()); + } + if (forceCheckpoint) { + forceCheckpoint(channel); + } + channel.stop(); + if(deleteCheckpoint) { + File checkpoint = new File(checkpointDir, "checkpoint"); + Assert.assertTrue(checkpoint.delete()); + File checkpointMetaData = Serialization.getMetaDataFile(checkpoint); + Assert.assertTrue(checkpointMetaData.delete()); + } + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set<String> out = takeEvents(channel, 1, Integer.MAX_VALUE); + compareInputAndOut(in, out); + } + @Test + public void testRestartFailsWhenMetaDataExistsButCheckpointDoesNot() + throws Exception { + Map<String, String> overrides = Maps.newHashMap(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Assert.assertEquals(1, putEvents(channel, "restart", 1, 1).size()); + forceCheckpoint(channel); + channel.stop(); + File checkpoint = new File(checkpointDir, "checkpoint"); + Assert.assertTrue(checkpoint.delete()); + File checkpointMetaData = Serialization.getMetaDataFile(checkpoint); + Assert.assertTrue(checkpointMetaData.exists()); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertFalse(channel.isOpen()); + } + @Test + public void testRestartFailsWhenCheckpointExistsButMetaDoesNot() + throws Exception { + Map<String, String> overrides = Maps.newHashMap(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Assert.assertEquals(1, putEvents(channel, "restart", 1, 1).size()); + forceCheckpoint(channel); + channel.stop(); + File checkpoint = new File(checkpointDir, "checkpoint"); + File checkpointMetaData = Serialization.getMetaDataFile(checkpoint); + Assert.assertTrue(checkpointMetaData.delete()); + Assert.assertTrue(checkpoint.exists()); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertFalse(channel.isOpen()); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/f26c79f6/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java new file mode 100644 index 0000000..5b1a088 --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRollback.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +import static org.apache.flume.channel.file.TestUtils.*; + +import java.util.Collections; +import java.util.Set; + +import org.apache.flume.Transaction; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.sink.LoggerSink; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Charsets; + + +public class TestFileChannelRollback extends TestFileChannelBase { + protected static final Logger LOG = LoggerFactory + .getLogger(TestFileChannelRollback.class); + + @Before + public void setup() { + super.setup(); + } + + @After + public void teardown() { + super.teardown(); + } + @Test + public void testRollbackAfterNoPutTake() throws Exception { + channel.start(); + Assert.assertTrue(channel.isOpen()); + Transaction transaction; + transaction = channel.getTransaction(); + transaction.begin(); + transaction.rollback(); + transaction.close(); + + // ensure we can reopen log with no error + channel.stop(); + channel = createFileChannel(); + channel.start(); + Assert.assertTrue(channel.isOpen()); + transaction = channel.getTransaction(); + transaction.begin(); + Assert.assertNull(channel.take()); + transaction.commit(); + transaction.close(); + } + @Test + public void testRollbackSimulatedCrash() throws Exception { + channel.start(); + Assert.assertTrue(channel.isOpen()); + int numEvents = 50; + Set<String> in = putEvents(channel, "rollback", 1, numEvents); + + Transaction transaction; + // put an item we will rollback + transaction = channel.getTransaction(); + transaction.begin(); + channel.put(EventBuilder.withBody("rolled back".getBytes(Charsets.UTF_8))); + transaction.rollback(); + transaction.close(); + + // simulate crash + channel.stop(); + channel = createFileChannel(); + channel.start(); + Assert.assertTrue(channel.isOpen()); + + // we should not get the rolled back item + Set<String> out = takeEvents(channel, 1, numEvents); + compareInputAndOut(in, out); + } + @Test + public void testRollbackSimulatedCrashWithSink() throws Exception { + channel.start(); + Assert.assertTrue(channel.isOpen()); + int numEvents = 100; + + LoggerSink sink = new LoggerSink(); + sink.setChannel(channel); + // sink will leave one item + CountingSinkRunner runner = new CountingSinkRunner(sink, numEvents - 1); + runner.start(); + putEvents(channel, "rollback", 10, numEvents); + + Transaction transaction; + // put an item we will rollback + transaction = channel.getTransaction(); + transaction.begin(); + byte[] bytes = "rolled back".getBytes(Charsets.UTF_8); + channel.put(EventBuilder.withBody(bytes)); + transaction.rollback(); + transaction.close(); + + while(runner.isAlive()) { + Thread.sleep(10L); + } + Assert.assertEquals(numEvents - 1, runner.getCount()); + for(Exception ex : runner.getErrors()) { + LOG.warn("Sink had error", ex); + } + Assert.assertEquals(Collections.EMPTY_LIST, runner.getErrors()); + + // simulate crash + channel.stop(); + channel = createFileChannel(); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set<String> out = takeEvents(channel, 1, 1); + Assert.assertEquals(1, out.size()); + String s = out.iterator().next(); + Assert.assertTrue(s, s.startsWith("rollback-90-9")); + } +}
