FLUME-1586: File Channel should support verifying integrity of individual events.
(Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5e53a056 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5e53a056 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5e53a056 Branch: refs/heads/flume-1.4 Commit: 5e53a056b0ed749d928008278a0949467fefa0d9 Parents: 48504e8 Author: Brock Noland <[email protected]> Authored: Wed Jun 12 17:35:38 2013 -0700 Committer: Brock Noland <[email protected]> Committed: Wed Jun 12 17:36:09 2013 -0700 ---------------------------------------------------------------------- bin/flume-ng | 6 + .../channel/file/CorruptEventException.java | 36 + .../apache/flume/channel/file/FileChannel.java | 47 +- .../java/org/apache/flume/channel/file/Log.java | 12 +- .../org/apache/flume/channel/file/LogFile.java | 104 +- .../apache/flume/channel/file/LogFileV3.java | 18 +- .../flume/channel/file/NoopRecordException.java | 35 + .../java/org/apache/flume/channel/file/Put.java | 38 +- .../flume/channel/file/ReplayHandler.java | 3 +- .../flume/channel/file/Serialization.java | 15 +- .../channel/file/TransactionEventRecord.java | 10 +- .../flume/channel/file/proto/ProtosFactory.java | 1127 +++++++++--------- .../src/main/proto/filechannel.proto | 1 + .../flume/channel/file/TestFileChannel.java | 44 +- .../org/apache/flume/channel/file/TestLog.java | 28 +- .../apache/flume/channel/file/TestLogFile.java | 111 +- .../file/TestTransactionEventRecordV3.java | 16 +- .../apache/flume/channel/file/TestUtils.java | 55 +- flume-ng-dist/pom.xml | 4 + flume-ng-dist/src/main/assembly/bin.xml | 1 + flume-ng-dist/src/main/assembly/src.xml | 2 + flume-tools/pom.xml | 143 +++ .../flume/tools/FileChannelIntegrityTool.java | 142 +++ .../java/org/apache/flume/tools/FlumeTool.java | 24 + .../org/apache/flume/tools/FlumeToolType.java | 40 + .../org/apache/flume/tools/FlumeToolsMain.java | 68 ++ .../tools/TestFileChannelIntegrityTool.java | 247 ++++ pom.xml | 7 + 28 files changed, 1765 insertions(+), 619 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/bin/flume-ng ---------------------------------------------------------------------- diff --git a/bin/flume-ng b/bin/flume-ng index 22b95b8..65cc985 100755 --- a/bin/flume-ng +++ b/bin/flume-ng @@ -26,6 +26,7 @@ FLUME_AGENT_CLASS="org.apache.flume.node.Application" FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient" FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo" +FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain" CLEAN_FLAG=1 ################################ @@ -261,6 +262,9 @@ case "$mode" in avro-client) opt_avro_client=1 ;; + tool) + opt_tool=1 + ;; version) opt_version=1 CLEAN_FLAG=0 @@ -433,6 +437,8 @@ elif [ -n "$opt_avro_client" ] ; then run_flume $FLUME_AVRO_CLIENT_CLASS $args elif [ -n "${opt_version}" ] ; then run_flume $FLUME_VERSION_CLASS $args +elif [ -n "${opt_tool}" ] ; then + run_flume $FLUME_TOOLS_CLASS $args else error "This message should never appear" 1 fi http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java new file mode 100644 index 0000000..691d291 --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CorruptEventException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + + +public class CorruptEventException extends Exception { + + private static final long serialVersionUID = -2986946303540798416L; + public CorruptEventException() { + super(); + } + + public CorruptEventException(String msg) { + super(msg); + } + + public CorruptEventException(String msg, Throwable th) { + super(msg, th); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index cc0d38a..36f150b 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -100,6 +100,7 @@ public class FileChannel extends BasicChannelSemantics { private String encryptionActiveKey; private String encryptionCipherProvider; private boolean useDualCheckpoints; + private boolean isTest = false; @Override public synchronized void setName(String name) { @@ -451,6 +452,7 @@ public class FileChannel extends BasicChannelSemantics { private String getStateAsString() { return String.valueOf(getState()); } + @Override protected void doPut(Event event) throws InterruptedException { channelCounter.incrementEventPutAttemptCount(); @@ -511,23 +513,40 @@ public class FileChannel extends BasicChannelSemantics { + "log. Try increasing the log write timeout value. " + channelNameDescriptor); } + + /* + * 1. Take an event which is in the queue. + * 2. If getting that event does not throw NoopRecordException, + * then return it. + * 3. Else try to retrieve the next event from the queue + * 4. Repeat 2 and 3 until queue is empty or an event is returned. + */ + try { - FlumeEventPointer ptr = queue.removeHead(transactionID); - if(ptr != null) { - try { - // first add to takeList so that if write to disk - // fails rollback actually does it's work - Preconditions.checkState(takeList.offer(ptr), "takeList offer failed " - + channelNameDescriptor); - log.take(transactionID, ptr); // write take to disk - Event event = log.get(ptr); - return event; - } catch (IOException e) { - throw new ChannelException("Take failed due to IO error " - + channelNameDescriptor, e); + while (true) { + FlumeEventPointer ptr = queue.removeHead(transactionID); + if (ptr == null) { + return null; + } else { + try { + // first add to takeList so that if write to disk + // fails rollback actually does it's work + Preconditions.checkState(takeList.offer(ptr), + "takeList offer failed " + + channelNameDescriptor); + log.take(transactionID, ptr); // write take to disk + Event event = log.get(ptr); + return event; + } catch (IOException e) { + throw new ChannelException("Take failed due to IO error " + + channelNameDescriptor, e); + } catch (NoopRecordException e) { + LOG.warn("Corrupt record replaced by File Channel Integrity " + + "tool found. Will retrieve next event", e); + takeList.remove(ptr); + } } } - return null; } finally { log.unlockShared(); } http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 1918baa..8dc0ff8 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -73,7 +73,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; */ @InterfaceAudience.Private @InterfaceStability.Unstable -class Log { +public class Log { public static final String PREFIX = "log-"; private static final Logger LOGGER = LoggerFactory.getLogger(Log.class); private static final int MIN_NUM_LOGS = 2; @@ -571,12 +571,18 @@ class Log { * @throws InterruptedException */ FlumeEvent get(FlumeEventPointer pointer) throws IOException, - InterruptedException { + InterruptedException, NoopRecordException { Preconditions.checkState(open, "Log is closed"); int id = pointer.getFileID(); LogFile.RandomReader logFile = idLogFileMap.get(id); Preconditions.checkNotNull(logFile, "LogFile is null for id " + id); - return logFile.get(pointer.getOffset()); + try { + return logFile.get(pointer.getOffset()); + } catch (CorruptEventException ex) { + open = false; + throw new IOException("Corrupt event found. Please run File Channel " + + "Integrity tool.", ex); + } } /** http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java index d3db896..bb8ce1a 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java @@ -22,6 +22,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.flume.ChannelException; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; import org.apache.flume.channel.file.encryption.CipherProvider; import org.apache.flume.channel.file.encryption.KeyProvider; import org.apache.flume.tools.DirectMemoryUtils; @@ -31,6 +34,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.EOFException; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; @@ -40,7 +44,9 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicLong; -abstract class LogFile { [email protected] [email protected] +public abstract class LogFile { private static final Logger LOG = LoggerFactory .getLogger(LogFile.class); @@ -54,8 +60,9 @@ abstract class LogFile { private static final ByteBuffer FILL = DirectMemoryUtils. allocate(1024 * 1024); // preallocation, 1MB - protected static final byte OP_RECORD = Byte.MAX_VALUE; - protected static final byte OP_EOF = Byte.MIN_VALUE; + public static final byte OP_RECORD = Byte.MAX_VALUE; + public static final byte OP_NOOP = (Byte.MAX_VALUE + Byte.MIN_VALUE)/2; + public static final byte OP_EOF = Byte.MIN_VALUE; static { for (int i = 0; i < FILL.capacity(); i++) { @@ -63,6 +70,13 @@ abstract class LogFile { } } + protected static void skipRecord(RandomAccessFile fileHandle, + int offset) throws IOException { + fileHandle.seek(offset); + int length = fileHandle.readInt(); + fileHandle.skipBytes(length); + } + abstract static class MetaDataWriter { private final File file; private final int logFileID; @@ -296,6 +310,48 @@ abstract class LogFile { } } + /** + * This is an class meant to be an internal Flume API, + * and can change at any time. Intended to be used only from File Channel Integrity + * test tool. Not to be used for any other purpose. + */ + public static class OperationRecordUpdater { + private final RandomAccessFile fileHandle; + private final File file; + + public OperationRecordUpdater(File file) throws FileNotFoundException { + Preconditions.checkState(file.exists(), "File to update, " + + file.toString() + " does not exist."); + this.file = file; + fileHandle = new RandomAccessFile(file, "rw"); + } + + public void markRecordAsNoop(long offset) throws IOException { + // First ensure that the offset actually is an OP_RECORD. There is a + // small possibility that it still is OP_RECORD, + // but is not actually the beginning of a record. Is there anything we + // can do about it? + fileHandle.seek(offset); + byte byteRead = fileHandle.readByte(); + Preconditions.checkState(byteRead == OP_RECORD || byteRead == OP_NOOP, + "Expected to read a record but the byte read indicates EOF"); + fileHandle.seek(offset); + LOG.info("Marking event as " + OP_NOOP + " at " + offset + " for file " + + file.toString()); + fileHandle.writeByte(OP_NOOP); + } + + public void close() { + try { + fileHandle.getFD().sync(); + fileHandle.close(); + } catch (IOException e) { + LOG.error("Could not close file handle to file " + + fileHandle.toString(), e); + } + } + } + static abstract class RandomReader { private final File file; private final BlockingQueue<RandomAccessFile> readFileHandles = @@ -311,7 +367,7 @@ abstract class LogFile { } protected abstract TransactionEventRecord doGet(RandomAccessFile fileHandle) - throws IOException; + throws IOException, CorruptEventException; abstract int getVersion(); @@ -323,13 +379,18 @@ abstract class LogFile { return encryptionKeyProvider; } - FlumeEvent get(int offset) throws IOException, InterruptedException { + FlumeEvent get(int offset) throws IOException, InterruptedException, + CorruptEventException, NoopRecordException { Preconditions.checkState(open, "File closed"); RandomAccessFile fileHandle = checkOut(); boolean error = true; try { fileHandle.seek(offset); byte operation = fileHandle.readByte(); + if(operation == OP_NOOP) { + throw new NoopRecordException("No op record found. Corrupt record " + + "may have been repaired by File Channel Integrity tool"); + } Preconditions.checkState(operation == OP_RECORD, Integer.toHexString(operation)); TransactionEventRecord record = doGet(fileHandle); @@ -408,7 +469,7 @@ abstract class LogFile { } } - static abstract class SequentialReader { + public static abstract class SequentialReader { private final RandomAccessFile fileHandle; private final FileChannel fileChannel; @@ -434,7 +495,7 @@ abstract class LogFile { fileHandle = new RandomAccessFile(file, "r"); fileChannel = fileHandle.getChannel(); } - abstract LogRecord doNext(int offset) throws IOException; + abstract LogRecord doNext(int offset) throws IOException, CorruptEventException; abstract int getVersion(); @@ -488,7 +549,7 @@ abstract class LogFile { } } - LogRecord next() throws IOException { + public LogRecord next() throws IOException, CorruptEventException { int offset = -1; try { long position = fileChannel.position(); @@ -499,14 +560,26 @@ abstract class LogFile { } offset = (int) position; Preconditions.checkState(offset >= 0); - byte operation = fileHandle.readByte(); - if(operation != OP_RECORD) { - if(operation == OP_EOF) { + while (offset < fileHandle.length()) { + byte operation = fileHandle.readByte(); + if (operation == OP_RECORD) { + break; + } else if (operation == OP_EOF) { LOG.info("Encountered EOF at " + offset + " in " + file); + return null; + } else if (operation == OP_NOOP) { + LOG.info("No op event found in file: " + file.toString() + + " at " + offset + ". Skipping event."); + skipRecord(fileHandle, offset + 1); + offset = (int) fileHandle.getFilePointer(); + continue; } else { LOG.error("Encountered non op-record at " + offset + " " + - Integer.toHexString(operation) + " in " + file); + Integer.toHexString(operation) + " in " + file); + return null; } + } + if(offset >= fileHandle.length()) { return null; } return doNext(offset); @@ -518,7 +591,10 @@ abstract class LogFile { } } - void close() { + public long getPosition() throws IOException { + return fileChannel.position(); + } + public void close() { if(fileHandle != null) { try { fileHandle.close(); @@ -540,7 +616,7 @@ abstract class LogFile { return buffer; } - public static void main(String[] args) throws EOFException, IOException { + public static void main(String[] args) throws EOFException, IOException, CorruptEventException { File file = new File(args[0]); LogFile.SequentialReader reader = null; try { http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java index d9a2a9b..38f6ecb 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java @@ -21,6 +21,8 @@ package org.apache.flume.channel.file; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import com.google.protobuf.GeneratedMessage; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; import org.apache.flume.channel.file.encryption.CipherProvider; import org.apache.flume.channel.file.encryption.CipherProviderFactory; import org.apache.flume.channel.file.encryption.KeyProvider; @@ -43,7 +45,9 @@ import java.util.concurrent.LinkedBlockingDeque; * Represents a single data file on disk. Has methods to write, * read sequentially (replay), and read randomly (channel takes). */ -class LogFileV3 extends LogFile { [email protected] [email protected] +public class LogFileV3 extends LogFile { protected static final Logger LOGGER = LoggerFactory.getLogger(LogFileV3.class); @@ -267,7 +271,7 @@ class LogFileV3 extends LogFile { } @Override protected TransactionEventRecord doGet(RandomAccessFile fileHandle) - throws IOException { + throws IOException, CorruptEventException { // readers are opened right when the file is created and thus // empty. As such we wait to initialize until there is some // data before we we initialize @@ -297,10 +301,11 @@ class LogFileV3 extends LogFile { } } - static class SequentialReader extends LogFile.SequentialReader { + public static class SequentialReader extends LogFile.SequentialReader { private CipherProvider.Decryptor decryptor; - SequentialReader(File file, @Nullable KeyProvider encryptionKeyProvider) - throws EOFException, IOException { + + public SequentialReader(File file, @Nullable KeyProvider + encryptionKeyProvider) throws EOFException, IOException { super(file, encryptionKeyProvider); File metaDataFile = Serialization.getMetaDataFile(file); FileInputStream inputStream = new FileInputStream(metaDataFile); @@ -344,8 +349,9 @@ class LogFileV3 extends LogFile { public int getVersion() { return Serialization.VERSION_3; } + @Override - LogRecord doNext(int offset) throws IOException { + LogRecord doNext(int offset) throws IOException, CorruptEventException { byte[] buffer = readDelimitedBuffer(getFileHandle()); if(decryptor != null) { buffer = decryptor.decrypt(buffer); http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/NoopRecordException.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/NoopRecordException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/NoopRecordException.java new file mode 100644 index 0000000..5f446b8 --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/NoopRecordException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +public class NoopRecordException extends Exception { + private static final long serialVersionUID = -7394180633208889738L; + + public NoopRecordException() { + super(); + } + + public NoopRecordException(String msg) { + super(msg); + } + + public NoopRecordException(String msg, Throwable th) { + super(msg, th); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java index 4235a79..f08f024 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java @@ -24,7 +24,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Map; +import java.util.zip.CRC32; +import java.util.zip.Checksum; +import com.google.common.annotations.VisibleForTesting; import org.apache.flume.channel.file.proto.ProtosFactory; import com.google.common.base.Preconditions; @@ -36,13 +39,19 @@ import com.google.protobuf.ByteString; */ class Put extends TransactionEventRecord { private FlumeEvent event; + // Should we move this to a higher level to not make multiple instances? + // Doing that might cause performance issues, since access to this would + // need to be synchronized (the whole reset-update-getValue cycle would + // need to be). + private final Checksum checksum = new CRC32(); + @VisibleForTesting Put(Long transactionID, Long logWriteOrderID) { - super(transactionID, logWriteOrderID); + this(transactionID, logWriteOrderID, null); } Put(Long transactionID, Long logWriteOrderID, FlumeEvent event) { - this(transactionID, logWriteOrderID); + super(transactionID, logWriteOrderID); this.event = event; } @@ -78,11 +87,14 @@ class Put extends TransactionEventRecord { } } eventBuilder.setBody(ByteString.copyFrom(event.getBody())); - putBuilder.setEvent(eventBuilder.build()); + ProtosFactory.FlumeEvent protoEvent = eventBuilder.build(); + putBuilder.setEvent(protoEvent); + putBuilder.setChecksum(calculateChecksum(event.getBody())); putBuilder.build().writeDelimitedTo(out); } @Override - void readProtos(InputStream in) throws IOException { + void readProtos(InputStream in) throws IOException, + CorruptEventException { ProtosFactory.Put put = Preconditions.checkNotNull(ProtosFactory. Put.parseDelimitedFrom(in), "Put cannot be null"); Map<String, String> headers = Maps.newHashMap(); @@ -90,9 +102,25 @@ class Put extends TransactionEventRecord { for(ProtosFactory.FlumeEventHeader header : protosEvent.getHeadersList()) { headers.put(header.getKey(), header.getValue()); } + byte[] eventBody = protosEvent.getBody().toByteArray(); + + if (put.hasChecksum()) { + long eventBodyChecksum = calculateChecksum(eventBody); + if (eventBodyChecksum != put.getChecksum()) { + throw new CorruptEventException("Expected checksum for event was " + + eventBodyChecksum + " but the checksum of the event is " + put.getChecksum()); + } + } // TODO when we remove v2, remove FlumeEvent and use EventBuilder here - event = new FlumeEvent(headers, protosEvent.getBody().toByteArray()); + event = new FlumeEvent(headers, eventBody); + } + + protected long calculateChecksum(byte[] body) { + checksum.reset(); + checksum.update(body, 0, body.length); + return checksum.getValue(); } + @Override public short getRecordType() { return Type.PUT.get(); http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java index fc47b23..c8f5fdd 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java @@ -26,6 +26,7 @@ import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; import org.apache.commons.collections.MultiMap; import org.apache.commons.collections.map.MultiValueMap; +import org.apache.flume.ChannelException; import org.apache.flume.channel.file.encryption.KeyProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -373,7 +374,7 @@ class ReplayHandler { } } } - private LogRecord next() throws IOException { + private LogRecord next() throws IOException, CorruptEventException { LogRecord resultLogRecord = logRecordBuffer.poll(); if(resultLogRecord != null) { // there is more log records to read http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java index d6897e1..f8160d9 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java @@ -21,6 +21,8 @@ package org.apache.flume.channel.file; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import org.apache.commons.io.FileUtils; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +35,9 @@ import java.io.RandomAccessFile; import java.util.Collections; import java.util.Set; -class Serialization { [email protected] [email protected] +public class Serialization { private Serialization() {} static final long SIZE_OF_INT = 4; @@ -43,9 +47,10 @@ class Serialization { static final int VERSION_2 = 2; static final int VERSION_3 = 3; - static final String METADATA_FILENAME = ".meta"; - static final String METADATA_TMP_FILENAME = ".tmp"; - static final String OLD_METADATA_FILENAME = METADATA_FILENAME + ".old"; + public static final String METADATA_FILENAME = ".meta"; + public static final String METADATA_TMP_FILENAME = ".tmp"; + public static final String OLD_METADATA_FILENAME = METADATA_FILENAME + + ".old"; // 64 K buffer to copy files. private static final int FILE_COPY_BUFFER_SIZE = 64 * 1024; @@ -121,7 +126,7 @@ class Serialization { * @param to Destination file - this file should not exist * @return true if the copy was successful */ - static boolean copyFile(File from, File to) throws IOException { + public static boolean copyFile(File from, File to) throws IOException { Preconditions.checkNotNull(from, "Source file is null, file copy failed."); Preconditions.checkNotNull(to, "Destination file is null, " + "file copy failed."); http://git-wip-us.apache.org/repos/asf/flume/blob/5e53a056/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java index 073042f..dda9b3f 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/TransactionEventRecord.java @@ -29,6 +29,8 @@ import java.io.OutputStream; import java.lang.reflect.Constructor; import java.nio.ByteBuffer; +import org.apache.flume.annotations.InterfaceAudience; +import org.apache.flume.annotations.InterfaceStability; import org.apache.flume.channel.file.proto.ProtosFactory; import org.apache.hadoop.io.Writable; import org.slf4j.Logger; @@ -41,7 +43,9 @@ import com.google.common.collect.ImmutableMap; /** * Base class for records in data file: Put, Take, Rollback, Commit */ -abstract class TransactionEventRecord implements Writable { [email protected] [email protected] +public abstract class TransactionEventRecord implements Writable { private static final Logger LOG = LoggerFactory .getLogger(TransactionEventRecord.class); private final long transactionID; @@ -63,7 +67,7 @@ abstract class TransactionEventRecord implements Writable { abstract void writeProtos(OutputStream out) throws IOException; - abstract void readProtos(InputStream in) throws IOException; + abstract void readProtos(InputStream in) throws IOException, CorruptEventException; long getLogWriteOrderID() { return logWriteOrderID; @@ -187,7 +191,7 @@ abstract class TransactionEventRecord implements Writable { static TransactionEventRecord fromByteArray(byte[] buffer) - throws IOException { + throws IOException, CorruptEventException { ByteArrayInputStream in = new ByteArrayInputStream(buffer); try { ProtosFactory.TransactionEventHeader header = Preconditions.
