Repository: flume Updated Branches: refs/heads/flume-1.6 da294a164 -> 13a03003e
FLUME-2586. HDFS Sink must try to rename files even if close fails. (Johny Rufus via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/13a03003 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/13a03003 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/13a03003 Branch: refs/heads/flume-1.6 Commit: 13a03003e7c1817d73451d3b6b1a58250c9469bd Parents: da294a1 Author: Hari Shreedharan <[email protected]> Authored: Wed Jan 28 13:02:50 2015 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Wed Jan 28 13:05:51 2015 -0800 ---------------------------------------------------------------------- .../apache/flume/sink/hdfs/BucketWriter.java | 64 ++++++++------------ .../apache/flume/sink/hdfs/MockFileSystem.java | 44 ++++++++------ .../flume/sink/hdfs/MockFsDataOutputStream.java | 20 ++---- .../flume/sink/hdfs/TestBucketWriter.java | 32 +++++----- .../flume/sink/hdfs/TestHDFSEventSink.java | 18 ++++-- 5 files changed, 83 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/13a03003/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index f9e39ac..62f4eee 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -109,12 +109,12 @@ class BucketWriter { private Clock clock = new SystemClock(); private final long retryInterval; - private final int maxCloseTries; + private final int maxRenameTries; // flag that the bucket writer was closed due to idling and thus shouldn't be // reopened. Not ideal, but avoids internals of owners protected boolean closed = false; - AtomicInteger closeTries = new AtomicInteger(0); + AtomicInteger renameTries = new AtomicInteger(0); BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize, Context context, String filePath, String fileName, String inUsePrefix, @@ -148,7 +148,7 @@ class BucketWriter { fileExtensionCounter = new AtomicLong(clock.currentTimeMillis()); this.retryInterval = retryInterval; - this.maxCloseTries = maxCloseTries; + this.maxRenameTries = maxCloseTries; isOpen = false; isUnderReplicated = false; this.writer.configure(context); @@ -336,55 +336,32 @@ class BucketWriter { private final HDFSWriter localWriter = writer; @Override public Void call() throws Exception { - LOG.info("Close tries incremented"); - closeTries.incrementAndGet(); localWriter.close(); // could block return null; } }; } - private Callable<Void> createScheduledCloseCallable( - final CallRunner<Void> closeCallRunner) { + private Callable<Void> createScheduledRenameCallable() { return new Callable<Void>() { private final String path = bucketPath; private final String finalPath = targetPath; private FileSystem fs = fileSystem; - private boolean closeSuccess = false; - private Path tmpFilePath = new Path(path); - private int closeTries = 1; // one attempt is already done - private final CallRunner<Void> closeCall = closeCallRunner; + private int renameTries = 1; // one attempt is already done @Override public Void call() throws Exception { - if (closeTries >= maxCloseTries) { - LOG.warn("Unsuccessfully attempted to close " + path + " " + - maxCloseTries + " times. File may be open, " + - "or may not have been renamed." ); + if (renameTries >= maxRenameTries) { + LOG.warn("Unsuccessfully attempted to rename " + path + " " + + maxRenameTries + " times. File may still be open."); return null; } - closeTries++; + renameTries++; try { - if (!closeSuccess) { - if (isClosedMethod == null) { - LOG.debug("isFileClosed method is not available in " + - "the version of HDFS client being used. " + - "Not attempting to close file again"); - return null; - } - if (!isFileClosed(fs, tmpFilePath)) { - callWithTimeout(closeCall); - } - // It is possible rename failing causes this thread - // to get rescheduled. In that case, - // don't check with NN if close succeeded as we know - // it did. This helps avoid an unnecessary RPC call. - closeSuccess = true; - } renameBucket(path, finalPath, fs); } catch (Exception e) { - LOG.warn("Closing file: " + path + " failed. Will " + + LOG.warn("Renaming file: " + path + " failed. Will " + "retry again in " + retryInterval + " seconds.", e); timedRollerPool.schedule(this, retryInterval, TimeUnit.SECONDS); @@ -422,10 +399,6 @@ class BucketWriter { "). Exception follows.", e); sinkCounter.incrementConnectionFailedCount(); failedToClose = true; - final Callable<Void> scheduledClose = - createScheduledCloseCallable(closeCallRunner); - timedRollerPool.schedule(scheduledClose, retryInterval, - TimeUnit.SECONDS); } isOpen = false; } else { @@ -443,10 +416,20 @@ class BucketWriter { idleFuture = null; } - // Don't rename file if this failed to close - if (bucketPath != null && fileSystem != null && !failedToClose) { + if (bucketPath != null && fileSystem != null) { // could block or throw IOException - renameBucket(bucketPath, targetPath, fileSystem); + try { + renameBucket(bucketPath, targetPath, fileSystem); + } catch(Exception e) { + LOG.warn( + "failed to rename() file (" + bucketPath + + "). Exception follows.", e); + sinkCounter.incrementConnectionFailedCount(); + final Callable<Void> scheduledRename = + createScheduledRenameCallable(); + timedRollerPool.schedule(scheduledRename, retryInterval, + TimeUnit.SECONDS); + } } if (callCloseCallback) { runCloseAction(); @@ -671,6 +654,7 @@ class BucketWriter { public Void call() throws Exception { if (fs.exists(srcPath)) { // could block LOG.info("Renaming " + srcPath + " to " + dstPath); + renameTries.incrementAndGet(); fs.rename(srcPath, dstPath); // could block } return null; http://git-wip-us.apache.org/repos/asf/flume/blob/13a03003/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java index ca4f852..4443335 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java @@ -37,13 +37,22 @@ public class MockFileSystem extends FileSystem { LoggerFactory.getLogger(MockFileSystem.class); FileSystem fs; - int numberOfClosesRequired; + int numberOfRetriesRequired; MockFsDataOutputStream latestOutputStream; + int currentRenameAttempts; + boolean closeSucceed = true; public MockFileSystem(FileSystem fs, - int numberOfClosesRequired) { + int numberOfRetriesRequired) { this.fs = fs; - this.numberOfClosesRequired = numberOfClosesRequired; + this.numberOfRetriesRequired = numberOfRetriesRequired; + } + + public MockFileSystem(FileSystem fs, + int numberOfRetriesRequired, boolean closeSucceed) { + this.fs = fs; + this.numberOfRetriesRequired = numberOfRetriesRequired; + this.closeSucceed = closeSucceed; } @Override @@ -51,7 +60,7 @@ public class MockFileSystem extends FileSystem { throws IOException { latestOutputStream = new MockFsDataOutputStream( - fs.append(arg0, arg1, arg2), numberOfClosesRequired); + fs.append(arg0, arg1, arg2), closeSucceed); return latestOutputStream; } @@ -60,7 +69,7 @@ public class MockFileSystem extends FileSystem { public FSDataOutputStream create(Path arg0) throws IOException { //throw new IOException ("HI there2"); latestOutputStream = new MockFsDataOutputStream( - fs.create(arg0), numberOfClosesRequired); + fs.create(arg0), closeSucceed); return latestOutputStream; } @@ -116,8 +125,17 @@ public class MockFileSystem extends FileSystem { @Override public boolean rename(Path arg0, Path arg1) throws IOException { - - return fs.rename(arg0, arg1); + currentRenameAttempts++; + logger.info( + "Attempting to Rename: '" + currentRenameAttempts + "' of '" + + numberOfRetriesRequired + "'"); + if (currentRenameAttempts >= numberOfRetriesRequired || + numberOfRetriesRequired == 0) { + logger.info("Renaming file"); + return fs.rename(arg0, arg1); + } else { + throw new IOException("MockIOException"); + } } @Override @@ -125,16 +143,4 @@ public class MockFileSystem extends FileSystem { fs.setWorkingDirectory(arg0); } - - public boolean isFileClosed(Path path) { - - logger.info("isFileClosed: '" + - latestOutputStream.getCurrentCloseAttempts() + "' , '" + - numberOfClosesRequired + "'"); - return latestOutputStream.getCurrentCloseAttempts() >= - numberOfClosesRequired || numberOfClosesRequired == 0; - } - - - } http://git-wip-us.apache.org/repos/asf/flume/blob/13a03003/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java index 5bbacae..35b034e 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java @@ -28,34 +28,24 @@ public class MockFsDataOutputStream extends FSDataOutputStream{ private static final Logger logger = LoggerFactory.getLogger(MockFsDataOutputStream.class); - int currentCloseAttempts = 0; - int numberOfClosesRequired; + boolean closeSucceed; public MockFsDataOutputStream(FSDataOutputStream wrapMe, - int numberOfClosesRequired) + boolean closeSucceed) throws IOException { super(wrapMe.getWrappedStream(), null); - - this.numberOfClosesRequired = numberOfClosesRequired; - + this.closeSucceed = closeSucceed; } @Override public void close() throws IOException { - currentCloseAttempts++; logger.info( - "Attempting to Close: '" + currentCloseAttempts + "' of '" + - numberOfClosesRequired + "'"); - if (currentCloseAttempts >= numberOfClosesRequired || - numberOfClosesRequired == 0) { + "Close Succeeded - " + closeSucceed); + if (closeSucceed) { logger.info("closing file"); super.close(); } else { throw new IOException("MockIOException"); } } - - public int getCurrentCloseAttempts() { - return currentCloseAttempts; - } } http://git-wip-us.apache.org/repos/asf/flume/blob/13a03003/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java index bcb912f..7c74b16 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java @@ -408,15 +408,18 @@ public class TestBucketWriter { @Test - public void testSequenceFileCloseRetries() throws Exception { - SequenceFileCloseRetryCoreTest(1); - SequenceFileCloseRetryCoreTest(5); - SequenceFileCloseRetryCoreTest(2); + public void testSequenceFileRenameRetries() throws Exception { + SequenceFileRenameRetryCoreTest(1, true); + SequenceFileRenameRetryCoreTest(5, true); + SequenceFileRenameRetryCoreTest(2, true); - } + SequenceFileRenameRetryCoreTest(1, false); + SequenceFileRenameRetryCoreTest(5, false); + SequenceFileRenameRetryCoreTest(2, false); + } - public void SequenceFileCloseRetryCoreTest(int numberOfClosesRequired) throws Exception { + public void SequenceFileRenameRetryCoreTest(int numberOfRetriesRequired, boolean closeSucceed) throws Exception { String hdfsPath = "file:///tmp/flume-test." + Calendar.getInstance().getTimeInMillis() + "." + Thread.currentThread().getId(); @@ -429,13 +432,13 @@ public class TestBucketWriter { fs.mkdirs(dirPath); context.put("hdfs.path", hdfsPath); context.put("hdfs.closeTries", - String.valueOf(numberOfClosesRequired)); + String.valueOf(numberOfRetriesRequired)); context.put("hdfs.rollCount", "1"); context.put("hdfs.retryInterval", "1"); context.put("hdfs.callTimeout", Long.toString(1000)); MockFileSystem mockFs = new MockFileSystem(fs, - numberOfClosesRequired); + numberOfRetriesRequired, closeSucceed); BucketWriter bucketWriter = new BucketWriter(0, 0, 1, 1, ctx, hdfsPath, hdfsPath, "singleBucket", ".tmp", null, null, null, new MockDataStream(mockFs), @@ -443,7 +446,7 @@ public class TestBucketWriter { new SinkCounter( "test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 1, - numberOfClosesRequired); + numberOfRetriesRequired); bucketWriter.setFileSystem(mockFs); // At this point, we checked if isFileClosed is available in @@ -453,12 +456,11 @@ public class TestBucketWriter { // This is what triggers the close, so a 2nd append is required :/ bucketWriter.append(event); - TimeUnit.SECONDS.sleep(numberOfClosesRequired + 2); + TimeUnit.SECONDS.sleep(numberOfRetriesRequired + 2); - int expectedNumberOfCloses = numberOfClosesRequired; - Assert.assertTrue("Expected " + expectedNumberOfCloses + " " + - "but got " + bucketWriter.closeTries.get(), - bucketWriter.closeTries.get() == - expectedNumberOfCloses); + Assert.assertTrue("Expected " + numberOfRetriesRequired + " " + + "but got " + bucketWriter.renameTries.get(), + bucketWriter.renameTries.get() == + numberOfRetriesRequired); } } http://git-wip-us.apache.org/repos/asf/flume/blob/13a03003/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index f29f1f1..1b7a364 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -1375,10 +1375,16 @@ public class TestHDFSEventSink { Assert.assertEquals(Integer.MAX_VALUE, sink.getTryCount()); } @Test - public void testRetryClose() throws InterruptedException, + public void testRetryRename() throws InterruptedException, LifecycleException, EventDeliveryException, IOException { + testRetryRename(true); + testRetryRename(false); + } + private void testRetryRename(boolean closeSucceed) throws InterruptedException, + LifecycleException, + EventDeliveryException, IOException { LOG.debug("Starting..."); String newPath = testPath + "/retryBucket"; @@ -1388,7 +1394,7 @@ public class TestHDFSEventSink { Path dirPath = new Path(newPath); fs.delete(dirPath, true); fs.mkdirs(dirPath); - MockFileSystem mockFs = new MockFileSystem(fs, 3); + MockFileSystem mockFs = new MockFileSystem(fs, 6, closeSucceed); Context context = getContextForRetryTests(); Configurables.configure(sink, context); @@ -1434,15 +1440,15 @@ public class TestHDFSEventSink { Collection<BucketWriter> writers = sink.getSfWriters().values(); - int totalCloseAttempts = 0; + int totalRenameAttempts = 0; for(BucketWriter writer: writers) { - LOG.info("Close tries = "+ writer.closeTries.get()); - totalCloseAttempts += writer.closeTries.get(); + LOG.info("Rename tries = "+ writer.renameTries.get()); + totalRenameAttempts += writer.renameTries.get(); } // stop clears the sfWriters map, so we need to compute the // close tries count before stopping the sink. sink.stop(); - Assert.assertEquals(6, totalCloseAttempts); + Assert.assertEquals(6, totalRenameAttempts); } }
