Updated Branches: refs/heads/flume-1.4 48504e8f3 -> 5e53a056b
http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto index 1e668d2..25520e8 100644 --- a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto +++ b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto @@ -56,6 +56,7 @@ message TransactionEventHeader { message Put { required FlumeEvent event = 1; + optional sfixed64 checksum = 2; } message Take { http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java index 0f7d14d..25765b5 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java @@ -21,13 +21,17 @@ package org.apache.flume.channel.file; import static org.apache.flume.channel.file.TestUtils.*; import static org.fest.reflect.core.Reflection.*; +import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; +import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -487,7 +491,7 @@ public class TestFileChannel extends TestFileChannelBase { public void testReferenceCounts() throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000"); - overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "100"); + overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "150"); final FileChannel channel = createFileChannel(overrides); channel.start(); putEvents(channel, "testing-reference-counting", 1, 15); @@ -568,4 +572,42 @@ public class TestFileChannel extends TestFileChannelBase { } + @Test (expected = IllegalStateException.class) + public void testChannelDiesOnCorruptEvent() throws Exception { + final FileChannel channel = createFileChannel(); + channel.start(); + putEvents(channel,"test-corrupt-event",100,100); + for(File dataDir : dataDirs) { + File[] files = dataDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + if(!name.endsWith("meta") && !name.contains("lock")){ + return true; + } + return false; + } + }); + if (files != null && files.length > 0) { + for (int j = 0; j < files.length; j++) { + RandomAccessFile fileToCorrupt = new RandomAccessFile(files[0], "rw"); + fileToCorrupt.seek(50); + fileToCorrupt.writeByte(234); + fileToCorrupt.close(); + } + } + } + try { + consumeChannel(channel, true); + } catch (IllegalStateException ex) { + // The rollback call in takeEvents() in TestUtils will cause an + // IllegalArgumentException - and this should be tested to verify the + // channel is completely stopped. + Assert.assertTrue(ex.getMessage().contains("Log is closed")); + throw ex; + } + Assert.fail(); + + + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java index 54978f8..8a5f8ad 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java @@ -74,7 +74,8 @@ public class TestLog { * not transactional so the commit is not required. */ @Test - public void testPutGet() throws IOException, InterruptedException { + public void testPutGet() + throws IOException, InterruptedException, NoopRecordException { FlumeEvent eventIn = TestUtils.newPersistableEvent(); long transactionID = ++this.transactionID; FlumeEventPointer eventPointer = log.put(transactionID, eventIn); @@ -86,7 +87,8 @@ public class TestLog { Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody()); } @Test - public void testRoll() throws IOException, InterruptedException { + public void testRoll() + throws IOException, InterruptedException, NoopRecordException { log.shutdownWorker(); Thread.sleep(1000); for (int i = 0; i < 1000; i++) { @@ -107,15 +109,16 @@ public class TestLog { } } } - // 78 (*2 for meta) files with TestLog.MAX_FILE_SIZE=1000 - Assert.assertEquals(156, logCount); + // 93 (*2 for meta) files with TestLog.MAX_FILE_SIZE=1000 + Assert.assertEquals(186, logCount); } /** * After replay of the log, we should find the event because the put * was committed */ @Test - public void testPutCommit() throws IOException, InterruptedException { + public void testPutCommit() + throws IOException, InterruptedException, NoopRecordException { FlumeEvent eventIn = TestUtils.newPersistableEvent(); long transactionID = ++this.transactionID; FlumeEventPointer eventPointerIn = log.put(transactionID, eventIn); @@ -243,16 +246,16 @@ public class TestLog { */ @Test public void testPutTakeRollbackLogReplayV1() - throws IOException, InterruptedException { + throws IOException, InterruptedException, NoopRecordException { doPutTakeRollback(true); } @Test public void testPutTakeRollbackLogReplayV2() - throws IOException, InterruptedException { + throws IOException, InterruptedException, NoopRecordException { doPutTakeRollback(false); } public void doPutTakeRollback(boolean useLogReplayV1) - throws IOException, InterruptedException { + throws IOException, InterruptedException, NoopRecordException { FlumeEvent eventIn = TestUtils.newPersistableEvent(); long putTransactionID = ++transactionID; FlumeEventPointer eventPointerIn = log.put(putTransactionID, eventIn); @@ -392,7 +395,7 @@ public class TestLog { } @Test public void testReplaySucceedsWithUnusedEmptyLogMetaDataNormalReplay() - throws IOException, InterruptedException { + throws IOException, InterruptedException, NoopRecordException { FlumeEvent eventIn = TestUtils.newPersistableEvent(); long transactionID = ++this.transactionID; FlumeEventPointer eventPointer = log.put(transactionID, eventIn); @@ -406,7 +409,7 @@ public class TestLog { } @Test public void testReplaySucceedsWithUnusedEmptyLogMetaDataFastReplay() - throws IOException, InterruptedException { + throws IOException, InterruptedException, NoopRecordException { FlumeEvent eventIn = TestUtils.newPersistableEvent(); long transactionID = ++this.transactionID; FlumeEventPointer eventPointer = log.put(transactionID, eventIn); @@ -422,7 +425,7 @@ public class TestLog { } public void doTestReplaySucceedsWithUnusedEmptyLogMetaData(FlumeEvent eventIn, FlumeEventPointer eventPointer) throws IOException, - InterruptedException { + InterruptedException, NoopRecordException { for (int i = 0; i < dataDirs.length; i++) { for(File logFile : LogUtils.getLogs(dataDirs[i])) { if(logFile.length() == 0L) { @@ -461,7 +464,8 @@ public class TestLog { } private void takeAndVerify(FlumeEventPointer eventPointerIn, - FlumeEvent eventIn) throws IOException, InterruptedException { + FlumeEvent eventIn) + throws IOException, InterruptedException, NoopRecordException { FlumeEventQueue queue = log.getFlumeEventQueue(); FlumeEventPointer eventPointerOut = queue.removeHead(0); Assert.assertNotNull(eventPointerOut); http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java index bef22ef..4da6ac1 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java @@ -21,12 +21,16 @@ package org.apache.flume.channel.file; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -98,6 +102,8 @@ public class TestLogFile { final List<Throwable> errors = Collections.synchronizedList(new ArrayList<Throwable>()); ExecutorService executorService = Executors.newFixedThreadPool(10); + CompletionService<Void> completionService = new ExecutorCompletionService + <Void>(executorService); final LogFile.RandomReader logFileReader = LogFileFactory.getRandomReader(dataFile, null); for (int i = 0; i < 1000; i++) { @@ -117,7 +123,7 @@ public class TestLogFile { ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); FlumeEventPointer ptr = logFileWriter.put(bytes); final int offset = ptr.getOffset(); - executorService.submit(new Runnable() { + completionService.submit(new Runnable() { @Override public void run() { try { @@ -130,7 +136,11 @@ public class TestLogFile { } } } - }); + }, null); + } + + for(int i = 0; i < 1000; i++) { + completionService.take(); } // first try and throw failures for(Throwable throwable : errors) { @@ -142,7 +152,8 @@ public class TestLogFile { } } @Test - public void testReader() throws InterruptedException, IOException { + public void testReader() throws InterruptedException, IOException, + CorruptEventException { Map<Integer, Put> puts = Maps.newHashMap(); for (int i = 0; i < 1000; i++) { FlumeEvent eventIn = TestUtils.newPersistableEvent(); @@ -169,7 +180,8 @@ public class TestLogFile { } @Test - public void testReaderOldMetaFile() throws InterruptedException, IOException { + public void testReaderOldMetaFile() throws InterruptedException, + IOException, CorruptEventException { Map<Integer, Put> puts = Maps.newHashMap(); for (int i = 0; i < 1000; i++) { FlumeEvent eventIn = TestUtils.newPersistableEvent(); @@ -204,7 +216,8 @@ public class TestLogFile { } @Test - public void testReaderTempMetaFile() throws InterruptedException, IOException { + public void testReaderTempMetaFile() throws InterruptedException, + IOException, CorruptEventException { Map<Integer, Put> puts = Maps.newHashMap(); for (int i = 0; i < 1000; i++) { FlumeEvent eventIn = TestUtils.newPersistableEvent(); @@ -260,4 +273,92 @@ public class TestLogFile { Assert.assertEquals(3, metaData.getCheckpointPosition()); Assert.assertEquals(4, metaData.getCheckpointWriteOrderID()); } + + @Test (expected = CorruptEventException.class) + public void testPutGetCorruptEvent() throws Exception { + final LogFile.RandomReader logFileReader = + LogFileFactory.getRandomReader(dataFile, null); + final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500); + final Put put = new Put(++transactionID, WriteOrderOracle.next(), + eventIn); + ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); + FlumeEventPointer ptr = logFileWriter.put(bytes); + logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit + (transactionID, WriteOrderOracle.next()))); + final int offset = ptr.getOffset(); + RandomAccessFile writer = new RandomAccessFile(dataFile, "rw"); + writer.seek(offset + 1500); + writer.write((byte) 45); + writer.write((byte) 12); + writer.getFD().sync(); + logFileReader.get(offset); + + // Should have thrown an exception by now. + Assert.fail(); + + } + + @Test (expected = NoopRecordException.class) + public void testPutGetNoopEvent() throws Exception { + final LogFile.RandomReader logFileReader = + LogFileFactory.getRandomReader(dataFile, null); + final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500); + final Put put = new Put(++transactionID, WriteOrderOracle.next(), + eventIn); + ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); + FlumeEventPointer ptr = logFileWriter.put(bytes); + logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit + (transactionID, WriteOrderOracle.next()))); + final int offset = ptr.getOffset(); + LogFile.OperationRecordUpdater updater = new LogFile + .OperationRecordUpdater(dataFile); + updater.markRecordAsNoop(offset); + logFileReader.get(offset); + + // Should have thrown an exception by now. + Assert.fail(); + } + + @Test + public void testOperationRecordUpdater() throws Exception { + File tempDir = Files.createTempDir(); + File temp = new File(tempDir, "temp"); + final RandomAccessFile tempFile = new RandomAccessFile(temp, "rw"); + for(int i = 0; i < 5000; i++) { + tempFile.write(LogFile.OP_RECORD); + } + tempFile.seek(0); + LogFile.OperationRecordUpdater recordUpdater = new LogFile + .OperationRecordUpdater(temp); + //Convert every 10th byte into a noop byte + for(int i = 0; i < 5000; i+=10) { + recordUpdater.markRecordAsNoop(i); + } + recordUpdater.close(); + + tempFile.seek(0); + // Verify every 10th byte is actually a NOOP + for(int i = 0; i < 5000; i+=10) { + tempFile.seek(i); + Assert.assertEquals(LogFile.OP_NOOP, tempFile.readByte()); + } + + } + + @Test + public void testOpRecordUpdaterWithFlumeEvents() throws Exception{ + final FlumeEvent eventIn = TestUtils.newPersistableEvent(2500); + final Put put = new Put(++transactionID, WriteOrderOracle.next(), + eventIn); + ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put); + FlumeEventPointer ptr = logFileWriter.put(bytes); + logFileWriter.commit(TransactionEventRecord.toByteBuffer(new Commit + (transactionID, WriteOrderOracle.next()))); + final int offset = ptr.getOffset(); + LogFile.OperationRecordUpdater updater = new LogFile + .OperationRecordUpdater(dataFile); + updater.markRecordAsNoop(offset); + RandomAccessFile fileReader = new RandomAccessFile(dataFile, "rw"); + Assert.assertEquals(LogFile.OP_NOOP, fileReader.readByte()); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java index f403422..eb0ce04 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java @@ -53,7 +53,7 @@ public class TestTransactionEventRecordV3 { commit.getRecordType()); } @Test - public void testPutSerialization() throws IOException { + public void testPutSerialization() throws IOException, CorruptEventException { Map<String, String> headers = new HashMap<String, String>(); headers.put("key", "value"); Put in = new Put(System.currentTimeMillis(), @@ -70,7 +70,8 @@ public class TestTransactionEventRecordV3 { Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody())); } @Test - public void testPutSerializationNullHeader() throws IOException { + public void testPutSerializationNullHeader() throws IOException, + CorruptEventException { Put in = new Put(System.currentTimeMillis(), WriteOrderOracle.next(), new FlumeEvent(null, new byte[0])); @@ -84,7 +85,8 @@ public class TestTransactionEventRecordV3 { Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody())); } @Test - public void testTakeSerialization() throws IOException { + public void testTakeSerialization() throws IOException, + CorruptEventException { Take in = new Take(System.currentTimeMillis(), WriteOrderOracle.next(), 10, 20); Take out = (Take)TransactionEventRecord.fromByteArray(toByteArray(in)); @@ -97,7 +99,8 @@ public class TestTransactionEventRecordV3 { } @Test - public void testRollbackSerialization() throws IOException { + public void testRollbackSerialization() throws IOException, + CorruptEventException { Rollback in = new Rollback(System.currentTimeMillis(), WriteOrderOracle.next()); Rollback out = (Rollback)TransactionEventRecord.fromByteArray(toByteArray(in)); @@ -108,7 +111,8 @@ public class TestTransactionEventRecordV3 { } @Test - public void testCommitSerialization() throws IOException { + public void testCommitSerialization() throws IOException, + CorruptEventException { Commit in = new Commit(System.currentTimeMillis(), WriteOrderOracle.next()); Commit out = (Commit)TransactionEventRecord.fromByteArray(toByteArray(in)); @@ -119,7 +123,7 @@ public class TestTransactionEventRecordV3 { } @Test - public void testBadType() throws IOException { + public void testBadType() throws IOException, CorruptEventException { TransactionEventRecord in = mock(TransactionEventRecord.class); when(in.getRecordType()).thenReturn(Short.MIN_VALUE); try { http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java index 563dbcc..a5ab45a 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.net.URL; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; @@ -64,6 +65,16 @@ public class TestUtils { return event; } + public static FlumeEvent newPersistableEvent(int size) { + Map<String, String> headers = Maps.newHashMap(); + String timestamp = String.valueOf(System.currentTimeMillis()); + headers.put("timestamp", timestamp); + byte[] data = new byte[size]; + Arrays.fill(data, (byte) 54); + FlumeEvent event = new FlumeEvent(headers, data); + return event; + } + public static DataInput toDataInput(Writable writable) throws IOException { ByteArrayOutputStream byteOutput = new ByteArrayOutputStream(); DataOutputStream dataOutput = new DataOutputStream(byteOutput); @@ -129,25 +140,47 @@ public class TestUtils { .invoke(true)); } + public static Set<String> takeEvents(Channel channel, int batchSize) + throws Exception { + return takeEvents(channel, batchSize, false); + } + + public static Set<String> takeEvents(Channel channel, + int batchSize, boolean checkForCorruption) throws Exception { + return takeEvents(channel, batchSize, Integer.MAX_VALUE, checkForCorruption); + } + public static Set<String> takeEvents(Channel channel, - int batchSize) throws Exception { - return takeEvents(channel, batchSize, Integer.MAX_VALUE); + int batchSize, int numEvents) throws Exception { + return takeEvents(channel, batchSize, numEvents, false); } public static Set<String> takeEvents(Channel channel, - int batchSize, int numEvents) throws Exception { + int batchSize, int numEvents, boolean checkForCorruption) throws + Exception { Set<String> result = Sets.newHashSet(); for (int i = 0; i < numEvents; i += batchSize) { Transaction transaction = channel.getTransaction(); try { transaction.begin(); for (int j = 0; j < batchSize; j++) { - Event event = null; + Event event; try { event = channel.take(); } catch (ChannelException ex) { - Assert.assertTrue(ex.getMessage().startsWith( - "Take list for FileBackedTransaction, capacity")); + Throwable th = ex; + String msg; + if(checkForCorruption) { + msg = "Corrupt event found. Please run File Channel"; + th = ex.getCause(); + } else { + msg = "Take list for FileBackedTransaction, capacity"; + } + Assert.assertTrue(th.getMessage().startsWith( + msg)); + if(checkForCorruption) { + throw (Exception) th; + } transaction.commit(); return result; } @@ -168,15 +201,19 @@ public class TestUtils { } return result; } - public static Set<String> consumeChannel(Channel channel) - throws Exception { + + public static Set<String> consumeChannel(Channel channel) throws Exception { + return consumeChannel(channel, false); + } + public static Set<String> consumeChannel(Channel channel, + boolean checkForCorruption) throws Exception { Set<String> result = Sets.newHashSet(); int[] batchSizes = new int[] { 1000, 100, 10, 1 }; for (int i = 0; i < batchSizes.length; i++) { while(true) { - Set<String> batch = takeEvents(channel, batchSizes[i]); + Set<String> batch = takeEvents(channel, batchSizes[i], checkForCorruption); if(batch.isEmpty()) { break; } http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml index e266272..9a5f64e 100644 --- a/flume-ng-dist/pom.xml +++ b/flume-ng-dist/pom.xml @@ -137,6 +137,10 @@ <artifactId>flume-ng-log4jappender</artifactId> <classifier>jar-with-dependencies</classifier> </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-tools</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-dist/src/main/assembly/bin.xml ---------------------------------------------------------------------- diff --git a/flume-ng-dist/src/main/assembly/bin.xml b/flume-ng-dist/src/main/assembly/bin.xml index 74f0608..925a54a 100644 --- a/flume-ng-dist/src/main/assembly/bin.xml +++ b/flume-ng-dist/src/main/assembly/bin.xml @@ -95,6 +95,7 @@ <exclude>flume-ng-legacy-sources/**</exclude> <exclude>flume-ng-clients/**</exclude> <exclude>flume-ng-embedded-agent/**</exclude> + <exclude>flume-tools/**</exclude> <exclude>**/target/**</exclude> <exclude>**/.classpath</exclude> <exclude>**/.project</exclude> http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-dist/src/main/assembly/src.xml ---------------------------------------------------------------------- diff --git a/flume-ng-dist/src/main/assembly/src.xml b/flume-ng-dist/src/main/assembly/src.xml index 7fafab8..5b86994 100644 --- a/flume-ng-dist/src/main/assembly/src.xml +++ b/flume-ng-dist/src/main/assembly/src.xml @@ -47,6 +47,7 @@ <include>org.apache.flume:flume-ng-legacy-sources</include> <include>org.apache.flume:flume-ng-clients</include> <include>org.apache.flume:flume-ng-embedded-agent</include> + <include>org.apache.flume:flume-tools</include> </includes> <sources> @@ -89,6 +90,7 @@ <exclude>flume-ng-legacy-sources/**</exclude> <exclude>flume-ng-clients/**</exclude> <exclude>flume-ng-embedded-agent/**</exclude> + <exclude>flume-tools/**</exclude> <exclude>**/target/**</exclude> <exclude>**/.classpath</exclude> <exclude>**/.project</exclude> http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-tools/pom.xml ---------------------------------------------------------------------- diff --git a/flume-tools/pom.xml b/flume-tools/pom.xml new file mode 100644 index 0000000..386c766 --- /dev/null +++ b/flume-tools/pom.xml @@ -0,0 +1,143 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flume-parent</artifactId> + <groupId>org.apache.flume</groupId> + <version>1.4.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.flume</groupId> + <artifactId>flume-tools</artifactId> + + <name>Flume NG Tools</name> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flume.flume-ng-channels</groupId> + <artifactId>flume-file-channel</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.easytesting</groupId> + <artifactId>fest-reflect</artifactId> + <version>1.4</version> + </dependency> + + </dependencies> + <profiles> + + <profile> + <id>hadoop-1.0</id> + <activation> + <property> + <name>!hadoop.profile</name> + </property> + </activation> + <properties> + <hadoop.version>1.0.1</hadoop.version> + <hadoop.common.artifact.id>hadoop-core</hadoop.common.artifact.id> + </properties> + <dependencies> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-test</artifactId> + <scope>test</scope> + </dependency> + + <!-- required because the hadoop-core pom is missing these deps + and MiniDFSCluster pulls in the webhdfs classes --> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + </profile> + + <profile> + <id>hadoop-2</id> + <activation> + <property> + <name>hadoop.profile</name> + <value>2</value> + </property> + </activation> + <properties> + <hadoop.version>2.0.0-alpha</hadoop.version> + <hadoop.common.artifact.id>hadoop-common</hadoop.common.artifact.id> + </properties> + <dependencies> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + </profile> + + </profiles> + +</project> http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java ---------------------------------------------------------------------- diff --git a/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java new file mode 100644 index 0000000..aa24fa5 --- /dev/null +++ b/flume-tools/src/main/java/org/apache/flume/tools/FileChannelIntegrityTool.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.tools; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.flume.FlumeException; +import org.apache.flume.channel.file.CorruptEventException; +import org.apache.flume.channel.file.Log; +import org.apache.flume.channel.file.LogFile; +import org.apache.flume.channel.file.LogFileV3; +import org.apache.flume.channel.file.LogRecord; +import org.apache.flume.channel.file.Serialization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class FileChannelIntegrityTool implements FlumeTool { + public static final Logger LOG = LoggerFactory.getLogger + (FileChannelIntegrityTool.class); + + private final List<File> dataDirs = new ArrayList<File>(); + + @Override + public void run(String[] args) throws IOException, ParseException { + boolean shouldContinue = parseCommandLineOpts(args); + if(!shouldContinue) { + LOG.error("Could not parse command line options. Exiting ..."); + System.exit(1); + } + for(File dataDir : dataDirs) { + File[] dataFiles = dataDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + if(!name.endsWith(Serialization.METADATA_FILENAME) + && !name.endsWith(Serialization.METADATA_TMP_FILENAME) + && !name.endsWith(Serialization.OLD_METADATA_FILENAME) + && !name.equals(Log.FILE_LOCK)) { + return true; + } + return false; + } + }); + if (dataFiles != null && dataFiles.length > 0) { + for (File dataFile : dataFiles) { + LOG.info("Checking for corruption in " + dataFile.toString()); + LogFile.SequentialReader reader = + new LogFileV3.SequentialReader(dataFile, null); + LogFile.OperationRecordUpdater updater = new LogFile + .OperationRecordUpdater(dataFile); + boolean fileDone = false; + boolean fileBackedup = false; + while (!fileDone) { + long eventPosition = 0; + try { + // This depends on the fact that events are of the form: + // Type, length, data. + eventPosition = reader.getPosition(); + // Try to get the record, if the checksums don't match, + // this will throw a CorruptEventException - so the real logic + // is in the catch block below. + LogRecord record = reader.next(); + if (record != null) { + record.getEvent(); + } else { + fileDone = true; + } + } catch (CorruptEventException e) { + LOG.warn("Corruption found in " + dataFile.toString() + " at " + + eventPosition); + if (!fileBackedup) { + Serialization.copyFile(dataFile, new File(dataFile.getParent(), + dataFile.getName() + ".bak")); + fileBackedup = true; + } + updater.markRecordAsNoop(eventPosition); + } + } + updater.close(); + reader.close(); + } + } + } + } + + private boolean parseCommandLineOpts(String[] args) throws ParseException { + Options options = new Options(); + options + .addOption("l", "dataDirs", true, "Comma-separated list of data " + + "directories which the tool must verify. This option is mandatory") + .addOption("h", "help", false, "Display help"); + + CommandLineParser parser = new GnuParser(); + CommandLine commandLine = parser.parse(options, args); + if(commandLine.hasOption("help")) { + new HelpFormatter().printHelp("java -jar fcintegritytool ", + options, true); + return false; + } + if(!commandLine.hasOption("dataDirs")) { + new HelpFormatter().printHelp("java -jar fcintegritytool ", "", + options, "dataDirs is required.", true); + return false; + } else { + String dataDirStr[] = commandLine.getOptionValue("dataDirs").split(","); + for(String dataDir : dataDirStr) { + File f = new File(dataDir); + if(!f.exists()) { + throw new FlumeException("Data directory, " + dataDir + " does not " + + "exist."); + } + dataDirs.add(f); + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-tools/src/main/java/org/apache/flume/tools/FlumeTool.java ---------------------------------------------------------------------- diff --git a/flume-tools/src/main/java/org/apache/flume/tools/FlumeTool.java b/flume-tools/src/main/java/org/apache/flume/tools/FlumeTool.java new file mode 100644 index 0000000..28da8fe --- /dev/null +++ b/flume-tools/src/main/java/org/apache/flume/tools/FlumeTool.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.tools; + +public interface FlumeTool { + + public void run(String[] args) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java ---------------------------------------------------------------------- diff --git a/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java new file mode 100644 index 0000000..f886c89 --- /dev/null +++ b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolType.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.tools; + +public enum FlumeToolType { + FCINTEGRITYTOOL(FileChannelIntegrityTool.class); + + private final Class<? extends FlumeTool> klass; + private FlumeToolType(Class<? extends FlumeTool> klass) { + this.klass = klass; + } + + public Class<? extends FlumeTool> getClassInstance() { + return this.klass; + } + + public static String getNames() { + StringBuilder builder = new StringBuilder(); + for(FlumeToolType type: values()) { + builder.append(type.name().toLowerCase() + "\n"); + } + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java ---------------------------------------------------------------------- diff --git a/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java new file mode 100644 index 0000000..799ce85 --- /dev/null +++ b/flume-tools/src/main/java/org/apache/flume/tools/FlumeToolsMain.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.tools; + +import com.google.common.base.Preconditions; + +import java.util.Arrays; + +public class FlumeToolsMain implements FlumeTool { + + // Does the actual work in the run method so we can test it without actually + // having to start another process. + + public static void main(String[] args) throws Exception { + new FlumeToolsMain().run(args); + } + + private FlumeToolsMain() { + //No op. + } + + @Override + public void run(String[] args) throws Exception{ + String error = "Expected name of tool and arguments for" + + " tool to be passed in on the command line. Please pass one of the " + + "following as arguments to this command: \n"; + StringBuilder builder = new StringBuilder(error); + for(FlumeToolType type : FlumeToolType.values()) { + builder.append(type.name()).append("\n"); + } + if(args == null || args.length == 0) { + System.out.println(builder.toString()); + System.exit(1); + } + String toolName = args[0]; + FlumeTool tool = null; + for(FlumeToolType type : FlumeToolType.values()) { + if(toolName.equalsIgnoreCase(type.name())) { + tool = type.getClassInstance().newInstance(); + break; + } + } + Preconditions.checkNotNull(tool, "Cannot find tool matching " + toolName + + ". Please select one of: \n " + FlumeToolType.getNames()); + if (args.length == 1) { + tool.run(new String[0]); + } else { + tool.run(Arrays.asList(args).subList(1, args.length). + toArray(new String[0])); + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java ---------------------------------------------------------------------- diff --git a/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java new file mode 100644 index 0000000..d328671 --- /dev/null +++ b/flume-tools/src/test/java/org/apache/flume/tools/TestFileChannelIntegrityTool.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.tools; + +import com.google.common.io.Files; +import org.apache.commons.io.FileUtils; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Transaction; +import org.apache.flume.channel.file.FileChannel; +import org.apache.flume.channel.file.FileChannelConfiguration; +import org.apache.flume.channel.file.Log; +import org.apache.flume.channel.file.LogFile; +import org.apache.flume.channel.file.LogFileV3; +import org.apache.flume.channel.file.LogRecord; +import org.apache.flume.channel.file.Serialization; +import org.apache.flume.channel.file.WriteOrderOracle; +import org.apache.flume.event.EventBuilder; +import org.junit.After; +import org.junit.AfterClass; +import static org.fest.reflect.core.Reflection.*; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.RandomAccessFile; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + + +public class TestFileChannelIntegrityTool { + private static File baseDir; + private static File origCheckpointDir; + private static File origDataDir; + private static Event event; + private static Context ctx; + + private File checkpointDir; + private File dataDir; + + + @BeforeClass + public static void setUpClass() throws Exception{ + createDataFiles(); + } + + @Before + public void setUp() throws Exception { + checkpointDir = new File(baseDir, "checkpoint"); + dataDir = new File(baseDir, "dataDir"); + Assert.assertTrue(checkpointDir.mkdirs() || checkpointDir.isDirectory()); + Assert.assertTrue(dataDir.mkdirs() || dataDir.isDirectory()); + File[] dataFiles = origDataDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + if(name.contains("lock")) { + return false; + } + return true; + } + }); + for(File dataFile : dataFiles) { + Serialization.copyFile(dataFile, new File(dataDir, dataFile.getName())); + } + } + + @After + public void tearDown() throws Exception { + FileUtils.deleteDirectory(checkpointDir); + FileUtils.deleteDirectory(dataDir); + } + + @AfterClass + public static void tearDownClass() throws Exception { + FileUtils.deleteDirectory(origCheckpointDir); + FileUtils.deleteDirectory(origDataDir); + } + + @Test + public void testFixCorruptRecords() throws Exception { + doTestFixCorruptEvents(false); + } + @Test + public void testFixCorruptRecordsWithCheckpoint() throws Exception { + doTestFixCorruptEvents(true); + } + + public void doTestFixCorruptEvents(boolean withCheckpoint) throws Exception { + Set<String> corruptFiles = new HashSet<String>(); + File[] files = dataDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + if(name.contains("lock") || name.contains("meta")) { + return false; + } + return true; + } + }); + Random random = new Random(); + int corrupted = 0; + for (File dataFile : files) { + LogFile.SequentialReader reader = + new LogFileV3.SequentialReader(dataFile, null); + RandomAccessFile handle = new RandomAccessFile(dataFile, "rw"); + long eventPosition1 = reader.getPosition(); + LogRecord rec = reader.next(); + //No point corrupting commits, so ignore them + if(rec == null || + rec.getEvent().getClass().getName(). + equals("org.apache.flume.channel.file.Commit")) { + handle.close(); + reader.close(); + continue; + } + long eventPosition2 = reader.getPosition(); + rec = reader.next(); + handle.seek(eventPosition1 + 100); + handle.writeInt(random.nextInt()); + corrupted++; + corruptFiles.add(dataFile.getName()); + if (rec == null || + rec.getEvent().getClass().getName(). + equals("org.apache.flume.channel.file.Commit")) { + handle.close(); + reader.close(); + continue; + } + handle.seek(eventPosition2 + 100); + handle.writeInt(random.nextInt()); + corrupted++; + handle.close(); + reader.close(); + + } + FileChannelIntegrityTool tool = new FileChannelIntegrityTool(); + tool.run(new String[] {"-l", dataDir.toString()}); + FileChannel channel = new FileChannel(); + channel.setName("channel"); + String cp; + if(withCheckpoint) { + cp = origCheckpointDir.toString(); + } else { + FileUtils.deleteDirectory(checkpointDir); + Assert.assertTrue(checkpointDir.mkdirs()); + cp = checkpointDir.toString(); + } + ctx.put(FileChannelConfiguration.CHECKPOINT_DIR,cp); + ctx.put(FileChannelConfiguration.DATA_DIRS, dataDir.toString()); + channel.configure(ctx); + channel.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + int i = 0; + while(channel.take() != null) { + i++; + } + tx.commit(); + tx.close(); + channel.stop(); + Assert.assertEquals(25 - corrupted, i); + files = dataDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + if(name.contains(".bak")) { + return true; + } + return false; + } + }); + Assert.assertEquals(corruptFiles.size(), files.length); + for(File file : files) { + String name = file.getName(); + name = name.replaceAll(".bak", ""); + Assert.assertTrue(corruptFiles.remove(name)); + } + Assert.assertTrue(corruptFiles.isEmpty()); + } + + private static void createDataFiles() throws Exception { + final byte[] eventData = new byte[2000]; + for(int i = 0; i < 2000; i++) { + eventData[i] = 1; + } + WriteOrderOracle.setSeed(System.currentTimeMillis()); + event = EventBuilder.withBody(eventData); + baseDir = Files.createTempDir(); + if(baseDir.exists()) { + FileUtils.deleteDirectory(baseDir); + } + baseDir = Files.createTempDir(); + origCheckpointDir = new File(baseDir, "chkpt"); + Assert.assertTrue(origCheckpointDir.mkdirs() || origCheckpointDir.isDirectory()); + origDataDir = new File(baseDir, "data"); + Assert.assertTrue(origDataDir.mkdirs() || origDataDir.isDirectory()); + FileChannel channel = new FileChannel(); + channel.setName("channel"); + ctx = new Context(); + ctx.put(FileChannelConfiguration.CAPACITY, "1000"); + ctx.put(FileChannelConfiguration.CHECKPOINT_DIR, origCheckpointDir.toString()); + ctx.put(FileChannelConfiguration.DATA_DIRS, origDataDir.toString()); + ctx.put(FileChannelConfiguration.MAX_FILE_SIZE, "10000"); + ctx.put(FileChannelConfiguration.TRANSACTION_CAPACITY, "100"); + channel.configure(ctx); + channel.start(); + for (int j = 0; j < 5; j++) { + Transaction tx = channel.getTransaction(); + tx.begin(); + for (int i = 0; i < 5; i++) { + channel.put(event); + } + tx.commit(); + tx.close(); + } + Log log = field("log") + .ofType(Log.class) + .in(channel) + .get(); + + Assert.assertTrue("writeCheckpoint returned false", + method("writeCheckpoint") + .withReturnType(Boolean.class) + .withParameterTypes(Boolean.class) + .in(log) + .invoke(true)); + channel.stop(); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9982f5d..8026936 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,7 @@ limitations under the License. <module>flume-ng-clients</module> <module>flume-ng-sdk</module> <module>flume-ng-tests</module> + <module>flume-tools</module> </modules> <profiles> @@ -888,6 +889,12 @@ limitations under the License. <dependency> <groupId>org.apache.flume</groupId> + <artifactId>flume-tools</artifactId> + <version>1.4.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> <artifactId>flume-ng-node</artifactId> <version>1.4.0-SNAPSHOT</version> </dependency>
