http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java index d09ddde..0173390 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java @@ -18,61 +18,152 @@ */ package org.apache.flume.channel.file; -import com.google.common.collect.SetMultimap; import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; -import java.util.Random; -import java.util.concurrent.TimeUnit; +import com.google.common.io.Files; +@RunWith(value = Parameterized.class) public class TestFlumeEventQueue { - - File file; - File inflightTakes; - File inflightPuts; FlumeEventPointer pointer1 = new FlumeEventPointer(1, 1); FlumeEventPointer pointer2 = new FlumeEventPointer(2, 2); FlumeEventQueue queue; + EventQueueBackingStoreSupplier backingStoreSupplier; + EventQueueBackingStore backingStore; + + private abstract static class EventQueueBackingStoreSupplier { + File baseDir; + File checkpoint; + File inflightTakes; + File inflightPuts; + EventQueueBackingStoreSupplier() { + baseDir = Files.createTempDir(); + checkpoint = new File(baseDir, "checkpoint"); + inflightTakes = new File(baseDir, "inflightputs"); + inflightPuts = new File(baseDir, "inflighttakes"); + } + File getCheckpoint() { + return checkpoint; + } + File getInflightPuts() { + return inflightPuts; + } + File getInflightTakes() { + return inflightTakes; + } + void delete() { + FileUtils.deleteQuietly(baseDir); + } + abstract EventQueueBackingStore get() throws Exception ; + } + + @Parameters + public static Collection<Object[]> data() throws IOException { + Object[][] data = new Object[][] { { + new EventQueueBackingStoreSupplier() { + @Override + public EventQueueBackingStore get() throws IOException { + Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs()); + return new EventQueueBackingStoreFileV2(getCheckpoint(), 1000, + "test"); + } + } + }, { + new EventQueueBackingStoreSupplier() { + @Override + public EventQueueBackingStore get() throws IOException { + Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs()); + return new EventQueueBackingStoreFileV3(getCheckpoint(), 1000, + "test"); + } + } + } }; + return Arrays.asList(data); + } + + public TestFlumeEventQueue(EventQueueBackingStoreSupplier backingStoreSupplier) { + this.backingStoreSupplier = backingStoreSupplier; + } @Before public void setup() throws Exception { - file = File.createTempFile("Checkpoint", ""); - inflightTakes = File.createTempFile("inflighttakes", ""); - inflightPuts = File.createTempFile("inflightputs", ""); + backingStore = backingStoreSupplier.get(); } - @Test - public void testQueueIsEmptyAfterCreation() throws Exception { - queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); - Assert.assertNull(queue.removeHead(0)); + @After + public void cleanup() throws IOException { + if(backingStore != null) { + backingStore.close(); + } + backingStoreSupplier.delete(); } @Test public void testCapacity() throws Exception { - queue = new FlumeEventQueue(1, file, inflightTakes, inflightPuts,"test"); + backingStore.close(); + File checkpoint = backingStoreSupplier.getCheckpoint(); + Assert.assertTrue(checkpoint.delete()); + backingStore = new EventQueueBackingStoreFileV2(checkpoint, 1, "test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertFalse(queue.addTail(pointer2)); } @Test(expected=IllegalArgumentException.class) public void testInvalidCapacityZero() throws Exception { - queue = new FlumeEventQueue(0, file, inflightTakes, inflightPuts,"test"); + backingStore.close(); + File checkpoint = backingStoreSupplier.getCheckpoint(); + Assert.assertTrue(checkpoint.delete()); + backingStore = new EventQueueBackingStoreFileV2(checkpoint, 0, "test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); } @Test(expected=IllegalArgumentException.class) public void testInvalidCapacityNegative() throws Exception { - queue = new FlumeEventQueue(-1, file, inflightTakes, inflightPuts,"test"); + backingStore.close(); + File checkpoint = backingStoreSupplier.getCheckpoint(); + Assert.assertTrue(checkpoint.delete()); + backingStore = new EventQueueBackingStoreFileV2(checkpoint, -1, "test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); + } + @Test + public void testQueueIsEmptyAfterCreation() throws Exception { + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); + Assert.assertNull(queue.removeHead(0L)); } @Test public void addTail1() throws Exception { - queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertEquals(pointer1, queue.removeHead(0)); Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs()); } @Test public void addTail2() throws Exception { - queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertTrue(queue.addTail(pointer2)); Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs()); @@ -81,7 +172,9 @@ public class TestFlumeEventQueue { } @Test public void addTailLarge() throws Exception { - queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); int size = 500; Set<Integer> fileIDs = Sets.newHashSet(); for (int i = 1; i <= size; i++) { @@ -98,7 +191,9 @@ public class TestFlumeEventQueue { } @Test public void addHead1() throws Exception { - queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs()); Assert.assertEquals(pointer1, queue.removeHead(0)); @@ -106,7 +201,9 @@ public class TestFlumeEventQueue { } @Test public void addHead2() throws Exception { - queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertTrue(queue.addHead(pointer2)); Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs()); @@ -115,7 +212,9 @@ public class TestFlumeEventQueue { } @Test public void addHeadLarge() throws Exception { - queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); int size = 500; Set<Integer> fileIDs = Sets.newHashSet(); for (int i = 1; i <= size; i++) { @@ -132,7 +231,9 @@ public class TestFlumeEventQueue { } @Test public void addTailRemove1() throws Exception { - queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs()); Assert.assertTrue(queue.remove(pointer1)); @@ -143,7 +244,9 @@ public class TestFlumeEventQueue { @Test public void addTailRemove2() throws Exception { - queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertTrue(queue.addTail(pointer2)); Assert.assertTrue(queue.remove(pointer1)); @@ -152,14 +255,18 @@ public class TestFlumeEventQueue { @Test public void addHeadRemove1() throws Exception { - queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); queue.addHead(pointer1); Assert.assertTrue(queue.remove(pointer1)); Assert.assertNull(queue.removeHead(0)); } @Test public void addHeadRemove2() throws Exception { - queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertTrue(queue.addHead(pointer2)); Assert.assertTrue(queue.remove(pointer1)); @@ -167,7 +274,9 @@ public class TestFlumeEventQueue { } @Test public void testWrappingCorrectly() throws Exception { - queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); int size = Integer.MAX_VALUE; for (int i = 1; i <= size; i++) { if(!queue.addHead(new FlumeEventPointer(i, i))) { @@ -187,7 +296,9 @@ public class TestFlumeEventQueue { } @Test public void testInflightPuts() throws Exception{ - queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1); long txnID2 = txnID1 + 1; queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1); @@ -195,7 +306,9 @@ public class TestFlumeEventQueue { queue.addWithoutCommit(new FlumeEventPointer(2, 2), txnID2); queue.checkpoint(true); TimeUnit.SECONDS.sleep(3L); - queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightPuts(); Assert.assertTrue(deserializedMap.get( txnID1).contains(new FlumeEventPointer(1, 1).toLong())); @@ -207,7 +320,9 @@ public class TestFlumeEventQueue { @Test public void testInflightTakes() throws Exception { - queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test"); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1); long txnID2 = txnID1 + 1; queue.addTail(new FlumeEventPointer(1, 1)); @@ -218,8 +333,10 @@ public class TestFlumeEventQueue { queue.removeHead(txnID2); queue.checkpoint(true); TimeUnit.SECONDS.sleep(3L); - queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test"); - SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightTakes(); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); + SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightTakes(); Assert.assertTrue(deserializedMap.get( txnID1).contains(new FlumeEventPointer(1, 1).toLong())); Assert.assertTrue(deserializedMap.get(
http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java index e923a30..e3eb184 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java @@ -100,8 +100,8 @@ public class TestLog { } } } - // 67 files with TestLog.MAX_FILE_SIZE=1000 - Assert.assertEquals(78, logCount); + // 78 (*2 for meta) files with TestLog.MAX_FILE_SIZE=1000 + Assert.assertEquals(156, logCount); } /** * After replay of the log, we should find the event because the put http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java index 193cd2b..11d0be0 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java @@ -21,7 +21,9 @@ package org.apache.flume.channel.file; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -34,7 +36,6 @@ import org.junit.Before; import org.junit.Test; import com.google.common.base.Throwables; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; @@ -51,7 +52,7 @@ public class TestLogFile { dataDir = Files.createTempDir(); dataFile = new File(dataDir, String.valueOf(fileID)); Assert.assertTrue(dataDir.isDirectory()); - logFileWriter = new LogFile.Writer(dataFile, fileID, 1000); + logFileWriter = LogFileFactory.getWriter(dataFile, fileID, 1000); } @After public void cleanup() throws IOException { @@ -63,20 +64,25 @@ public class TestLogFile { } @Test public void testPutGet() throws InterruptedException, IOException { - final List<Throwable> errors = Lists.newArrayList(); + final List<Throwable> errors = + Collections.synchronizedList(new ArrayList<Throwable>()); ExecutorService executorService = Executors.newFixedThreadPool(10); - final LogFile.RandomReader logFileReader = new LogFile.RandomReader(dataFile); + final LogFile.RandomReader logFileReader = + LogFileFactory.getRandomReader(dataFile); for (int i = 0; i < 1000; i++) { // first try and throw failures - for(Throwable throwable : errors) { - Throwables.propagateIfInstanceOf(throwable, AssertionError.class); - } - // then throw errors - for(Throwable throwable : errors) { - Throwables.propagate(throwable); + synchronized (errors) { + for(Throwable throwable : errors) { + Throwables.propagateIfInstanceOf(throwable, AssertionError.class); + } + // then throw errors + for(Throwable throwable : errors) { + Throwables.propagate(throwable); + } } final FlumeEvent eventIn = TestUtils.newPersistableEvent(); - final Put put = new Put(++transactionID, eventIn); + final Put put = new Put(++transactionID, WriteOrderOracle.next(), + eventIn); ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); FlumeEventPointer ptr = logFileWriter.put(bytes); final int offset = ptr.getOffset(); @@ -110,12 +116,14 @@ public class TestLogFile { Map<Integer, Put> puts = Maps.newHashMap(); for (int i = 0; i < 1000; i++) { FlumeEvent eventIn = TestUtils.newPersistableEvent(); - Put put = new Put(++transactionID, eventIn); + Put put = new Put(++transactionID, WriteOrderOracle.next(), + eventIn); ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); FlumeEventPointer ptr = logFileWriter.put(bytes); puts.put(ptr.getOffset(), put); } - LogFile.SequentialReader reader = new LogFile.SequentialReader(dataFile); + LogFile.SequentialReader reader = + LogFileFactory.getSequentialReader(dataFile); LogRecord entry; while((entry = reader.next()) != null) { Integer offset = entry.getOffset(); http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogRecord.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogRecord.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogRecord.java index 9f6adc7..04b6ea9 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogRecord.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogRecord.java @@ -33,8 +33,10 @@ public class TestLogRecord { @Test public void testConstructor() { long now = System.currentTimeMillis(); - Commit commit = new Commit(now); + Commit commit = new Commit(now, now + 1); LogRecord logRecord = new LogRecord(1, 2, commit); + Assert.assertTrue(now == commit.getTransactionID()); + Assert.assertTrue(now + 1 == commit.getLogWriteOrderID()); Assert.assertTrue(1 == logRecord.getFileID()); Assert.assertTrue(2 == logRecord.getOffset()); Assert.assertTrue(commit == logRecord.getEvent()); @@ -46,8 +48,7 @@ public class TestLogRecord { long now = System.currentTimeMillis(); List<LogRecord> records = Lists.newArrayList(); for (int i = 0; i < 3; i++) { - Commit commit = new Commit((long)i); - commit.setLogWriteOrderID(now - i); + Commit commit = new Commit((long)i, now - i); LogRecord logRecord = new LogRecord(1, i, commit); records.add(logRecord); } http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecord.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecord.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecord.java deleted file mode 100644 index c73b11b..0000000 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecord.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.channel.file; - -import static org.mockito.Mockito.*; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.HashMap; - -import junit.framework.Assert; - -import org.junit.Test; - -public class TestTransactionEventRecord { - - @Test - public void testTypes() throws IOException { - Put put = new Put(System.currentTimeMillis()); - Assert.assertEquals(TransactionEventRecord.Type.PUT.get(), put.getRecordType()); - - Take take = new Take(System.currentTimeMillis()); - Assert.assertEquals(TransactionEventRecord.Type.TAKE.get(), take.getRecordType()); - - Rollback rollback = new Rollback(System.currentTimeMillis()); - Assert.assertEquals(TransactionEventRecord.Type.ROLLBACK.get(), rollback.getRecordType()); - - Commit commit = new Commit(System.currentTimeMillis()); - Assert.assertEquals(TransactionEventRecord.Type.COMMIT.get(), commit.getRecordType()); - } - - @Test - public void testPutSerialization() throws IOException { - Put in = new Put(System.currentTimeMillis(), - new FlumeEvent(new HashMap<String, String>(), new byte[0])); - in.setLogWriteOrderID(System.currentTimeMillis()); - Put out = (Put)TransactionEventRecord.fromDataInput(toDataInput(in)); - Assert.assertEquals(in.getClass(), out.getClass()); - Assert.assertEquals(in.getRecordType(), out.getRecordType()); - Assert.assertEquals(in.getTransactionID(), out.getTransactionID()); - Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID()); - Assert.assertEquals(in.getEvent().getHeaders(), out.getEvent().getHeaders()); - Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody())); - } - @Test - public void testTakeSerialization() throws IOException { - Take in = new Take(System.currentTimeMillis(), 10, 20); - in.setLogWriteOrderID(System.currentTimeMillis()); - Take out = (Take)TransactionEventRecord.fromDataInput(toDataInput(in)); - Assert.assertEquals(in.getClass(), out.getClass()); - Assert.assertEquals(in.getRecordType(), out.getRecordType()); - Assert.assertEquals(in.getTransactionID(), out.getTransactionID()); - Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID()); - Assert.assertEquals(in.getFileID(), out.getFileID()); - Assert.assertEquals(in.getOffset(), out.getOffset()); - } - - @Test - public void testRollbackSerialization() throws IOException { - Rollback in = new Rollback(System.currentTimeMillis()); - in.setLogWriteOrderID(System.currentTimeMillis()); - Rollback out = (Rollback)TransactionEventRecord.fromDataInput(toDataInput(in)); - Assert.assertEquals(in.getClass(), out.getClass()); - Assert.assertEquals(in.getRecordType(), out.getRecordType()); - Assert.assertEquals(in.getTransactionID(), out.getTransactionID()); - Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID()); - } - - @Test - public void testCommitSerialization() throws IOException { - Commit in = new Commit(System.currentTimeMillis()); - in.setLogWriteOrderID(System.currentTimeMillis()); - Commit out = (Commit)TransactionEventRecord.fromDataInput(toDataInput(in)); - Assert.assertEquals(in.getClass(), out.getClass()); - Assert.assertEquals(in.getRecordType(), out.getRecordType()); - Assert.assertEquals(in.getTransactionID(), out.getTransactionID()); - Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID()); - } - - @Test - public void testBadHeader() throws IOException { - Put in = new Put(System.currentTimeMillis(), - new FlumeEvent(new HashMap<String, String>(), new byte[0])); - in.setLogWriteOrderID(System.currentTimeMillis()); - try { - TransactionEventRecord.fromDataInput(toDataInput(0, in)); - Assert.fail(); - } catch (IOException e) { - Assert.assertEquals("Header 0 is not the required value: deadbeef", - e.getMessage()); - } - } - - @Test - public void testBadType() throws IOException { - TransactionEventRecord in = mock(TransactionEventRecord.class); - when(in.getRecordType()).thenReturn(Short.MIN_VALUE); - try { - TransactionEventRecord.fromDataInput(toDataInput(in)); - Assert.fail(); - } catch(NullPointerException e) { - Assert.assertEquals("Unknown action ffff8000", e.getMessage()); - } - } - - private DataInput toDataInput(TransactionEventRecord record) throws IOException { - ByteBuffer buffer = TransactionEventRecord.toByteBuffer(record); - ByteArrayInputStream byteInput = new ByteArrayInputStream(buffer.array()); - DataInputStream dataInput = new DataInputStream(byteInput); - return dataInput; - } - private DataInput toDataInput(int header, TransactionEventRecord record) throws IOException { - ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(); - DataOutputStream dataOutput = new DataOutputStream(byteOutput); - dataOutput.writeInt(header); - dataOutput.writeShort(record.getRecordType()); - dataOutput.writeLong(record.getTransactionID()); - record.write(dataOutput); - ByteArrayInputStream byteInput = new ByteArrayInputStream(byteOutput.toByteArray()); - DataInputStream dataInput = new DataInputStream(byteInput); - return dataInput; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java new file mode 100644 index 0000000..2356d90 --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +import static org.mockito.Mockito.*; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; + +import junit.framework.Assert; + +import org.junit.Test; + +@SuppressWarnings("deprecation") +public class TestTransactionEventRecordV2 { + + @Test + public void testTypes() throws IOException { + Put put = new Put(System.currentTimeMillis(), WriteOrderOracle.next()); + Assert.assertEquals(TransactionEventRecord.Type.PUT.get(), + put.getRecordType()); + + Take take = new Take(System.currentTimeMillis(), WriteOrderOracle.next()); + Assert.assertEquals(TransactionEventRecord.Type.TAKE.get(), + take.getRecordType()); + + Rollback rollback = new Rollback(System.currentTimeMillis(), + WriteOrderOracle.next()); + Assert.assertEquals(TransactionEventRecord.Type.ROLLBACK.get(), + rollback.getRecordType()); + + Commit commit = new Commit(System.currentTimeMillis(), + WriteOrderOracle.next()); + Assert.assertEquals(TransactionEventRecord.Type.COMMIT.get(), + commit.getRecordType()); + } + + @Test + public void testPutSerialization() throws IOException { + Put in = new Put(System.currentTimeMillis(), + WriteOrderOracle.next(), + new FlumeEvent(new HashMap<String, String>(), new byte[0])); + Put out = (Put)TransactionEventRecord.fromDataInputV2(toDataInput(in)); + Assert.assertEquals(in.getClass(), out.getClass()); + Assert.assertEquals(in.getRecordType(), out.getRecordType()); + Assert.assertEquals(in.getTransactionID(), out.getTransactionID()); + Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID()); + Assert.assertEquals(in.getEvent().getHeaders(), out.getEvent().getHeaders()); + Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody())); + } + @Test + public void testTakeSerialization() throws IOException { + Take in = new Take(System.currentTimeMillis(), + WriteOrderOracle.next(), 10, 20); + Take out = (Take)TransactionEventRecord.fromDataInputV2(toDataInput(in)); + Assert.assertEquals(in.getClass(), out.getClass()); + Assert.assertEquals(in.getRecordType(), out.getRecordType()); + Assert.assertEquals(in.getTransactionID(), out.getTransactionID()); + Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID()); + Assert.assertEquals(in.getFileID(), out.getFileID()); + Assert.assertEquals(in.getOffset(), out.getOffset()); + } + + @Test + public void testRollbackSerialization() throws IOException { + Rollback in = new Rollback(System.currentTimeMillis(), + WriteOrderOracle.next()); + Rollback out = (Rollback)TransactionEventRecord.fromDataInputV2(toDataInput(in)); + Assert.assertEquals(in.getClass(), out.getClass()); + Assert.assertEquals(in.getRecordType(), out.getRecordType()); + Assert.assertEquals(in.getTransactionID(), out.getTransactionID()); + Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID()); + } + + @Test + public void testCommitSerialization() throws IOException { + Commit in = new Commit(System.currentTimeMillis(), + WriteOrderOracle.next()); + Commit out = (Commit)TransactionEventRecord.fromDataInputV2(toDataInput(in)); + Assert.assertEquals(in.getClass(), out.getClass()); + Assert.assertEquals(in.getRecordType(), out.getRecordType()); + Assert.assertEquals(in.getTransactionID(), out.getTransactionID()); + Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID()); + } + + @Test + public void testBadHeader() throws IOException { + Put in = new Put(System.currentTimeMillis(), + WriteOrderOracle.next(), + new FlumeEvent(new HashMap<String, String>(), new byte[0])); + try { + TransactionEventRecord.fromDataInputV2(toDataInput(0, in)); + Assert.fail(); + } catch (IOException e) { + Assert.assertEquals("Header 0 is not the required value: deadbeef", + e.getMessage()); + } + } + + @Test + public void testBadType() throws IOException { + TransactionEventRecord in = mock(TransactionEventRecord.class); + when(in.getRecordType()).thenReturn(Short.MIN_VALUE); + try { + TransactionEventRecord.fromDataInputV2(toDataInput(in)); + Assert.fail(); + } catch(NullPointerException e) { + Assert.assertEquals("Unknown action ffff8000", e.getMessage()); + } + } + + private DataInput toDataInput(TransactionEventRecord record) throws IOException { + ByteBuffer buffer = TransactionEventRecord.toByteBufferV2(record); + ByteArrayInputStream byteInput = new ByteArrayInputStream(buffer.array()); + DataInputStream dataInput = new DataInputStream(byteInput); + return dataInput; + } + private DataInput toDataInput(int header, TransactionEventRecord record) throws IOException { + ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(); + DataOutputStream dataOutput = new DataOutputStream(byteOutput); + dataOutput.writeInt(header); + dataOutput.writeShort(record.getRecordType()); + dataOutput.writeLong(record.getTransactionID()); + dataOutput.writeLong(record.getLogWriteOrderID()); + record.write(dataOutput); + ByteArrayInputStream byteInput = new ByteArrayInputStream(byteOutput.toByteArray()); + DataInputStream dataInput = new DataInputStream(byteInput); + return dataInput; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java new file mode 100644 index 0000000..a9866a0 --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +import static org.mockito.Mockito.*; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import junit.framework.Assert; + +import org.junit.Test; + +public class TestTransactionEventRecordV3 { + + @Test + public void testTypes() throws IOException { + Put put = new Put(System.currentTimeMillis(), WriteOrderOracle.next()); + Assert.assertEquals(TransactionEventRecord.Type.PUT.get(), + put.getRecordType()); + + Take take = new Take(System.currentTimeMillis(), WriteOrderOracle.next()); + Assert.assertEquals(TransactionEventRecord.Type.TAKE.get(), + take.getRecordType()); + + Rollback rollback = new Rollback(System.currentTimeMillis(), + WriteOrderOracle.next()); + Assert.assertEquals(TransactionEventRecord.Type.ROLLBACK.get(), + rollback.getRecordType()); + + Commit commit = new Commit(System.currentTimeMillis(), + WriteOrderOracle.next()); + Assert.assertEquals(TransactionEventRecord.Type.COMMIT.get(), + commit.getRecordType()); + } + @Test + public void testPutSerialization() throws IOException { + Map<String, String> headers = new HashMap<String, String>(); + headers.put("key", "value"); + Put in = new Put(System.currentTimeMillis(), + WriteOrderOracle.next(), + new FlumeEvent(headers, new byte[0])); + Put out = (Put)TransactionEventRecord.fromInputStream(toInputStream(in)); + Assert.assertEquals(in.getClass(), out.getClass()); + Assert.assertEquals(in.getRecordType(), out.getRecordType()); + Assert.assertEquals(in.getTransactionID(), out.getTransactionID()); + Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID()); + Assert.assertEquals(in.getEvent().getHeaders(), out.getEvent().getHeaders()); + Assert.assertEquals(headers, in.getEvent().getHeaders()); + Assert.assertEquals(headers, out.getEvent().getHeaders()); + Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody())); + } + @Test + public void testPutSerializationNullHeader() throws IOException { + Put in = new Put(System.currentTimeMillis(), + WriteOrderOracle.next(), + new FlumeEvent(null, new byte[0])); + Put out = (Put)TransactionEventRecord.fromInputStream(toInputStream(in)); + Assert.assertEquals(in.getClass(), out.getClass()); + Assert.assertEquals(in.getRecordType(), out.getRecordType()); + Assert.assertEquals(in.getTransactionID(), out.getTransactionID()); + Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID()); + Assert.assertNull(in.getEvent().getHeaders()); + Assert.assertNotNull(out.getEvent().getHeaders()); + Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody())); + } + @Test + public void testTakeSerialization() throws IOException { + Take in = new Take(System.currentTimeMillis(), + WriteOrderOracle.next(), 10, 20); + Take out = (Take)TransactionEventRecord.fromInputStream(toInputStream(in)); + Assert.assertEquals(in.getClass(), out.getClass()); + Assert.assertEquals(in.getRecordType(), out.getRecordType()); + Assert.assertEquals(in.getTransactionID(), out.getTransactionID()); + Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID()); + Assert.assertEquals(in.getFileID(), out.getFileID()); + Assert.assertEquals(in.getOffset(), out.getOffset()); + } + + @Test + public void testRollbackSerialization() throws IOException { + Rollback in = new Rollback(System.currentTimeMillis(), + WriteOrderOracle.next()); + Rollback out = (Rollback)TransactionEventRecord.fromInputStream(toInputStream(in)); + Assert.assertEquals(in.getClass(), out.getClass()); + Assert.assertEquals(in.getRecordType(), out.getRecordType()); + Assert.assertEquals(in.getTransactionID(), out.getTransactionID()); + Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID()); + } + + @Test + public void testCommitSerialization() throws IOException { + Commit in = new Commit(System.currentTimeMillis(), + WriteOrderOracle.next()); + Commit out = (Commit)TransactionEventRecord.fromInputStream(toInputStream(in)); + Assert.assertEquals(in.getClass(), out.getClass()); + Assert.assertEquals(in.getRecordType(), out.getRecordType()); + Assert.assertEquals(in.getTransactionID(), out.getTransactionID()); + Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID()); + } + + @Test + public void testBadType() throws IOException { + TransactionEventRecord in = mock(TransactionEventRecord.class); + when(in.getRecordType()).thenReturn(Short.MIN_VALUE); + try { + TransactionEventRecord.fromInputStream(toInputStream(in)); + Assert.fail(); + } catch(NullPointerException e) { + Assert.assertEquals("Unknown action ffff8000", e.getMessage()); + } + } + + private InputStream toInputStream(TransactionEventRecord record) throws IOException { + ByteBuffer buffer = TransactionEventRecord.toByteBuffer(record); + ByteArrayInputStream byteInput = new ByteArrayInputStream(buffer.array()); + return byteInput; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java index e64f856..48948e4 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java @@ -18,27 +18,34 @@ */ package org.apache.flume.channel.file; -import com.google.common.base.Charsets; +import static org.fest.reflect.core.Reflection.*; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.net.URL; import java.util.Map; - -import org.apache.hadoop.io.Writable; - -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import java.util.Set; import java.util.UUID; +import java.util.zip.GZIPInputStream; + import org.apache.flume.Channel; import org.apache.flume.Event; import org.apache.flume.Transaction; import org.apache.flume.event.EventBuilder; +import org.apache.hadoop.io.Writable; import org.junit.Assert; -import static org.fest.reflect.core.Reflection.*; + +import com.google.common.base.Charsets; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.io.ByteStreams; +import com.google.common.io.Resources; public class TestUtils { @@ -162,4 +169,10 @@ public class TestUtils { } return result; } + public static void copyDecompressed(String resource, File output) + throws IOException { + URL input = Resources.getResource(resource); + ByteStreams.copy(new GZIPInputStream(input.openStream()), + new FileOutputStream(output)); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index aaf944f..1c267e9 100644 --- a/pom.xml +++ b/pom.xml @@ -673,6 +673,13 @@ limitations under the License. </dependency> <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <scope>compile</scope> + <version>2.4.1</version> + </dependency> + + <dependency> <groupId>org.mortbay.jetty</groupId> <artifactId>servlet-api</artifactId> <version>2.5-20110124</version>
