Updated Branches: refs/heads/flume-1.3.0 b6686e7d9 -> 80176f340
FLUME-1593. FileChannel race condition when log file rolls. (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/80176f34 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/80176f34 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/80176f34 Branch: refs/heads/flume-1.3.0 Commit: 80176f340aa9aeb7a9b68a98161c65c4f7218e2b Parents: b6686e7 Author: Hari Shreedharan <[email protected]> Authored: Fri Sep 21 21:42:52 2012 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Sep 21 21:44:24 2012 -0700 ---------------------------------------------------------------------- .../java/org/apache/flume/channel/file/Log.java | 81 ++++++++++----- .../org/apache/flume/channel/file/LogFile.java | 13 ++- .../channel/file/LogFileRetryableIOException.java | 34 ++++++ .../apache/flume/channel/file/TestFileChannel.java | 2 +- .../org/apache/flume/channel/file/TestLogFile.java | 9 +- 5 files changed, 103 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/80176f34/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index e36eafb..1072259 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -425,16 +425,23 @@ class Log { Put put = new Put(transactionID, WriteOrderOracle.next(), flumeEvent); ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put); int logFileIndex = nextLogWriter(transactionID); - if (logFiles.get(logFileIndex).isRollRequired(buffer)) { - roll(logFileIndex, buffer); - } boolean error = true; try { - FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer); - error = false; - return ptr; + try { + FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer); + error = false; + return ptr; + } catch (LogFileRetryableIOException e) { + if(!open) { + throw e; + } + roll(logFileIndex, buffer); + FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer); + error = false; + return ptr; + } } finally { - if (error) { + if(error && open) { roll(logFileIndex); } } @@ -455,15 +462,21 @@ class Log { pointer.getOffset(), pointer.getFileID()); ByteBuffer buffer = TransactionEventRecord.toByteBuffer(take); int logFileIndex = nextLogWriter(transactionID); - if (logFiles.get(logFileIndex).isRollRequired(buffer)) { - roll(logFileIndex, buffer); - } boolean error = true; try { - logFiles.get(logFileIndex).take(buffer); - error = false; + try { + logFiles.get(logFileIndex).take(buffer); + error = false; + } catch (LogFileRetryableIOException e) { + if(!open) { + throw e; + } + roll(logFileIndex, buffer); + logFiles.get(logFileIndex).take(buffer); + error = false; + } } finally { - if (error) { + if(error && open) { roll(logFileIndex); } } @@ -485,15 +498,21 @@ class Log { Rollback rollback = new Rollback(transactionID, WriteOrderOracle.next()); ByteBuffer buffer = TransactionEventRecord.toByteBuffer(rollback); int logFileIndex = nextLogWriter(transactionID); - if (logFiles.get(logFileIndex).isRollRequired(buffer)) { - roll(logFileIndex, buffer); - } boolean error = true; try { - logFiles.get(logFileIndex).rollback(buffer); - error = false; + try { + logFiles.get(logFileIndex).rollback(buffer); + error = false; + } catch (LogFileRetryableIOException e) { + if(!open) { + throw e; + } + roll(logFileIndex, buffer); + logFiles.get(logFileIndex).rollback(buffer); + error = false; + } } finally { - if (error) { + if(error && open) { roll(logFileIndex); } } @@ -631,25 +650,31 @@ class Log { * @throws IOException */ private void commit(long transactionID, short type) throws IOException { - Preconditions.checkState(open, "Log is closed"); Commit commit = new Commit(transactionID, WriteOrderOracle.next(), type); ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit); int logFileIndex = nextLogWriter(transactionID); - if (logFiles.get(logFileIndex).isRollRequired(buffer)) { - roll(logFileIndex, buffer); - } boolean error = true; try { - logFiles.get(logFileIndex).commit(buffer); - error = false; + try { + logFiles.get(logFileIndex).commit(buffer); + error = false; + } catch (LogFileRetryableIOException e) { + if(!open) { + throw e; + } + roll(logFileIndex, buffer); + logFiles.get(logFileIndex).commit(buffer); + error = false; + } } finally { - if (error) { + if(error && open) { roll(logFileIndex); } } } + /** * Atomic so not synchronization required. * @return @@ -660,6 +685,7 @@ class Log { /** * Unconditionally roll * Synchronization done internally + * * @param index * @throws IOException */ @@ -677,10 +703,11 @@ class Log { * methods call this method, and this method acquires only a * read lock. The synchronization guarantees that multiple threads don't * roll at the same time. + * * @param index * @throws IOException */ - private synchronized void roll(int index, ByteBuffer buffer) + private synchronized void roll(int index, ByteBuffer buffer) throws IOException { if (!tryLockShared()) { throw new IOException("Failed to obtain lock for writing to the log. " http://git-wip-us.apache.org/repos/asf/flume/blob/80176f34/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java index 8071140..a2c790c 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java @@ -187,10 +187,15 @@ abstract class LogFile { sync(); } private Pair<Integer, Integer> write(ByteBuffer buffer) throws IOException { - Preconditions.checkState(isOpen(), "File closed"); + if(!isOpen()) { + throw new LogFileRetryableIOException("File closed " + file); + } long length = position(); long expectedLength = length + (long) buffer.limit(); - Preconditions.checkArgument(expectedLength < (long) Integer.MAX_VALUE); + if(expectedLength > maxFileSize) { + throw new LogFileRetryableIOException(expectedLength + " > " + + maxFileSize); + } int offset = (int)length; Preconditions.checkState(offset >= 0, String.valueOf(offset)); // OP_RECORD + size + buffer @@ -208,7 +213,9 @@ abstract class LogFile { return isOpen() && position() + (long) buffer.limit() > getMaxSize(); } private void sync() throws IOException { - Preconditions.checkState(isOpen(), "File closed"); + if(!isOpen()) { + throw new LogFileRetryableIOException("File closed " + file); + } getFileChannel().force(false); } http://git-wip-us.apache.org/repos/asf/flume/blob/80176f34/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java new file mode 100644 index 0000000..9447652 --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileRetryableIOException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +import java.io.IOException; + +public class LogFileRetryableIOException extends IOException { + private static final long serialVersionUID = -2747112999806160431L; + public LogFileRetryableIOException() { + super(); + } + public LogFileRetryableIOException(String msg) { + super(msg); + } + public LogFileRetryableIOException(String msg, Throwable t) { + super(msg, t); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/80176f34/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java index 8baf8fe..c12e7d2 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java @@ -450,7 +450,7 @@ public class TestFileChannel extends TestFileChannelBase { public void testReferenceCounts() throws Exception { Map<String, String> overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000"); - overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "20"); + overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "100"); final FileChannel channel = createFileChannel(overrides); channel.start(); putEvents(channel, "testing-reference-counting", 1, 15); http://git-wip-us.apache.org/repos/asf/flume/blob/80176f34/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java index 87d9c3f..9fc834e 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java @@ -52,8 +52,8 @@ public class TestLogFile { dataDir = Files.createTempDir(); dataFile = new File(dataDir, String.valueOf(fileID)); Assert.assertTrue(dataDir.isDirectory()); - logFileWriter = LogFileFactory.getWriter(dataFile, fileID, 1000, null, null, - null); + logFileWriter = LogFileFactory.getWriter(dataFile, fileID, + Integer.MAX_VALUE, null, null, null); } @After public void cleanup() throws IOException { @@ -69,7 +69,7 @@ public class TestLogFile { public void testWriterRefusesToOverwriteFile() throws IOException { Assert.assertTrue(dataFile.isFile() || dataFile.createNewFile()); try { - LogFileFactory.getWriter(dataFile, fileID, 1000, null, null, + LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null, null); Assert.fail(); } catch (IllegalStateException e) { @@ -83,7 +83,7 @@ public class TestLogFile { Assert.assertFalse(dataFile.exists()); Assert.assertTrue(dataFile.mkdirs()); try { - LogFileFactory.getWriter(dataFile, fileID, 1000, null, null, + LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null, null); Assert.fail(); } catch (IllegalStateException e) { @@ -138,7 +138,6 @@ public class TestLogFile { for(Throwable throwable : errors) { Throwables.propagate(throwable); } - Assert.assertTrue(logFileWriter.isRollRequired(ByteBuffer.allocate(0))); } @Test public void testReader() throws InterruptedException, IOException {
