Updated Branches: refs/heads/trunk a34cdb0ea -> 5b5470bd5
FLUME-2007. HDFS Sink should check if file is closed and retry if it is not. (Ted Malaska via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5b5470bd Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5b5470bd Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5b5470bd Branch: refs/heads/trunk Commit: 5b5470bd5d3e94842032009c36788d4ae346674b Parents: a34cdb0 Author: Hari Shreedharan <[email protected]> Authored: Tue Jul 16 15:12:11 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Tue Jul 16 15:12:59 2013 -0700 ---------------------------------------------------------------------- .../flume/sink/hdfs/AbstractHDFSWriter.java | 64 +++++++++ .../sink/hdfs/HDFSCompressedDataStream.java | 2 +- .../apache/flume/sink/hdfs/HDFSDataStream.java | 2 +- .../flume/sink/hdfs/HDFSSequenceFile.java | 8 +- .../hdfs/MockFileSystemCloseRetryWrapper.java | 142 +++++++++++++++++++ ...MockFsDataOutputStreamCloseRetryWrapper.java | 73 ++++++++++ .../sink/hdfs/TestUseRawLocalFileSystem.java | 52 +++++++ 7 files changed, 340 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java index bc3b383..da0466d 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -44,6 +45,8 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { private Method refGetNumCurrentReplicas = null; private Method refGetDefaultReplication = null; private Integer configuredMinReplicas = null; + private Integer numberOfCloseRetries = null; + private long timeBetweenCloseRetries = Long.MAX_VALUE; final static Object [] NO_ARGS = new Object []{}; @@ -54,6 +57,17 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { Preconditions.checkArgument(configuredMinReplicas >= 0, "hdfs.minBlockReplicas must be greater than or equal to 0"); } + numberOfCloseRetries = context.getInteger("hdfs.closeTries", 1) - 1; + + if (numberOfCloseRetries > 1) { + try { + timeBetweenCloseRetries = context.getLong("hdfs.callTimeout", 10000l); + } catch (NumberFormatException e) { + logger.warn("hdfs.callTimeout can not be parsed to a long: " + context.getLong("hdfs.callTimeout")); + } + timeBetweenCloseRetries = Math.max(timeBetweenCloseRetries/numberOfCloseRetries, 1000); + } + } /** @@ -97,6 +111,7 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { this.destPath = destPath; this.refGetNumCurrentReplicas = reflectGetNumCurrentReplicas(outputStream); this.refGetDefaultReplication = reflectGetDefaultReplication(fs); + } protected void unregisterCurrentStream() { @@ -212,4 +227,53 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { return m; } + /** + * This will + * @param outputStream + * @throws IOException + */ + protected void closeHDFSOutputStream(OutputStream outputStream) + throws IOException { + try { + outputStream.close(); + + if (numberOfCloseRetries > 0) { + try { + Method isFileClosedMethod = getIsFileClosedMethod(); + int closeAttemptsMade = 0; + if (isFileClosedMethod != null) { + while (closeAttemptsMade < numberOfCloseRetries.intValue() && + Boolean.FALSE.equals(isFileClosedMethod.invoke(fs, destPath))) { + closeAttemptsMade++; + logger.debug("Waiting: '" + timeBetweenCloseRetries + "' before retry close"); + Thread.sleep(timeBetweenCloseRetries); + try { + outputStream.close(); + } catch (IOException e) { + logger.error("Unable to close HDFS file: '" + destPath + "'"); + } + } + if (closeAttemptsMade == numberOfCloseRetries.intValue()) { + logger.warn("Failed to close '" + destPath + "' is " + + numberOfCloseRetries + " retries, over " + (timeBetweenCloseRetries * numberOfCloseRetries) + " millseconds"); + } + } + } catch (Exception e) { + logger.error("Failed to close '" + destPath + "' is " + + numberOfCloseRetries + " retries, over " + (timeBetweenCloseRetries * numberOfCloseRetries) + " millseconds", e); + } + } + } catch (IOException e) { + logger.error("Unable to close HDFS file: '" + destPath + "'"); + } + } + + private Method getIsFileClosedMethod() { + try { + return fs.getClass().getMethod("isFileClosed", Path.class); + } catch (Exception e) { + return null; + } + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java index 2c2be6a..5518547 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java @@ -147,7 +147,7 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter { } fsOut.flush(); fsOut.sync(); - cmpOut.close(); + closeHDFSOutputStream(cmpOut); unregisterCurrentStream(); } http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java index b8214be..e20d1ee 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java @@ -123,7 +123,7 @@ public class HDFSDataStream extends AbstractHDFSWriter { serializer.beforeClose(); outStream.flush(); outStream.sync(); - outStream.close(); + closeHDFSOutputStream(outStream); unregisterCurrentStream(); } http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java index 0383744..5fe9f1b 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java @@ -75,6 +75,12 @@ public class HDFSSequenceFile extends AbstractHDFSWriter { Configuration conf = new Configuration(); Path dstPath = new Path(filePath); FileSystem hdfs = dstPath.getFileSystem(conf); + open(dstPath, codeC, compType, conf, hdfs); + } + + protected void open(Path dstPath, CompressionCodec codeC, + CompressionType compType, Configuration conf, FileSystem hdfs) + throws IOException { if(useRawLocalFileSystem) { if(hdfs instanceof LocalFileSystem) { hdfs = ((LocalFileSystem)hdfs).getRaw(); @@ -110,7 +116,7 @@ public class HDFSSequenceFile extends AbstractHDFSWriter { @Override public void close() throws IOException { writer.close(); - outStream.close(); + closeHDFSOutputStream(outStream); unregisterCurrentStream(); } http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java new file mode 100644 index 0000000..b5d89e6 --- /dev/null +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystemCloseRetryWrapper.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.hdfs; + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MockFileSystemCloseRetryWrapper extends FileSystem{ + + private static final Logger logger = + LoggerFactory.getLogger(MockFileSystemCloseRetryWrapper.class); + + FileSystem fs; + int numberOfClosesRequired; + boolean throwExceptionsOfFailedClose; + MockFsDataOutputStreamCloseRetryWrapper latestOutputStream; + + public MockFileSystemCloseRetryWrapper (FileSystem fs, + int numberOfClosesRequired, boolean throwExceptionsOfFailedClose) { + this.fs = fs; + this.throwExceptionsOfFailedClose = throwExceptionsOfFailedClose; + this.numberOfClosesRequired = numberOfClosesRequired; + } + + public MockFsDataOutputStreamCloseRetryWrapper getLastMockOutputStream() { + return latestOutputStream; + } + + @Override + public FSDataOutputStream append(Path arg0, int arg1, Progressable arg2) + throws IOException { + + latestOutputStream = new MockFsDataOutputStreamCloseRetryWrapper(fs.append(arg0, arg1, arg2), numberOfClosesRequired, throwExceptionsOfFailedClose); + + return latestOutputStream; + } + + @Override + public FSDataOutputStream create(Path arg0) throws IOException { + //throw new IOException ("HI there2"); + latestOutputStream = new MockFsDataOutputStreamCloseRetryWrapper(fs.create(arg0), numberOfClosesRequired, throwExceptionsOfFailedClose); + + return latestOutputStream; + } + + @Override + public FSDataOutputStream create(Path arg0, FsPermission arg1, boolean arg2, + int arg3, short arg4, long arg5, Progressable arg6) throws IOException { + throw new IOException ("Not a real file system"); + //return new MockFsDataOutputStreamCloseRetryWrapper(fs.create(arg0, arg1, arg2, arg3, arg4, arg5, arg6), numberOfClosesRequired, throwExceptionsOfFailedClose); + } + + @Override + @Deprecated + public boolean delete(Path arg0) throws IOException { + return fs.delete(arg0); + } + + @Override + public boolean delete(Path arg0, boolean arg1) throws IOException { + return fs.delete(arg0, arg1); + } + + @Override + public FileStatus getFileStatus(Path arg0) throws IOException { + return fs.getFileStatus(arg0); + } + + @Override + public URI getUri() { + return fs.getUri(); + } + + @Override + public Path getWorkingDirectory() { + return fs.getWorkingDirectory(); + } + + @Override + public FileStatus[] listStatus(Path arg0) throws IOException { + return fs.listStatus(arg0); + } + + @Override + public boolean mkdirs(Path arg0, FsPermission arg1) throws IOException { + // TODO Auto-generated method stub + return fs.mkdirs(arg0, arg1); + } + + @Override + public FSDataInputStream open(Path arg0, int arg1) throws IOException { + return fs.open(arg0, arg1); + } + + @Override + public boolean rename(Path arg0, Path arg1) throws IOException { + + return fs.rename(arg0, arg1); + } + + @Override + public void setWorkingDirectory(Path arg0) { + fs.setWorkingDirectory(arg0); + + } + + public boolean isFileClosed(Path path) { + + logger.info("isFileClosed: '" + latestOutputStream.getCurrentCloseAttempts() + "' , '" + numberOfClosesRequired + "'"); + + return latestOutputStream.getCurrentCloseAttempts() >= numberOfClosesRequired || numberOfClosesRequired == 0; + } + + + +} http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java new file mode 100644 index 0000000..1d8c140 --- /dev/null +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStreamCloseRetryWrapper.java @@ -0,0 +1,73 @@ +/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ +package org.apache.flume.sink.hdfs; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem.Statistics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MockFsDataOutputStreamCloseRetryWrapper extends FSDataOutputStream{ + + private static final Logger logger = + LoggerFactory.getLogger(MockFsDataOutputStreamCloseRetryWrapper.class); + + int currentCloseAttempts = 0; + int numberOfClosesRequired; + boolean throwExceptionsOfFailedClose; + + public MockFsDataOutputStreamCloseRetryWrapper(FSDataOutputStream wrapMe, + int numberOfClosesRequired, boolean throwExceptionsOfFailedClose) + throws IOException { + super(wrapMe.getWrappedStream(), null); + + this.numberOfClosesRequired = numberOfClosesRequired; + this.throwExceptionsOfFailedClose = throwExceptionsOfFailedClose; + + } + + public MockFsDataOutputStreamCloseRetryWrapper(OutputStream out, + Statistics stats) throws IOException { + super(out, stats); + + } + + @Override + public void close() throws IOException { + currentCloseAttempts++; + logger.info("Attempting to Close: '" + currentCloseAttempts + "' of '" + numberOfClosesRequired + "'"); + if (currentCloseAttempts > numberOfClosesRequired || numberOfClosesRequired == 0) { + logger.info("closing file"); + super.close(); + } else { + if (throwExceptionsOfFailedClose) { + logger.info("no closed and throwing exception"); + throw new IOException("MockIOException"); + } else { + logger.info("no closed and doing nothing"); + } + } + } + + public int getCurrentCloseAttempts() { + return currentCloseAttempts; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/5b5470bd/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java index ffbdde0..4476530 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestUseRawLocalFileSystem.java @@ -30,6 +30,9 @@ import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; import org.apache.flume.instrumentation.SinkCounter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; @@ -102,4 +105,53 @@ public class TestUseRawLocalFileSystem { stream.sync(); Assert.assertTrue(testFile.length() > 0); } + + @Test + public void testSequenceFileCloseRetries() throws Exception { + SequenceFileCloseRetryCoreTest(3, 0, false); + SequenceFileCloseRetryCoreTest(3, 1, false); + SequenceFileCloseRetryCoreTest(3, 5, false); + + SequenceFileCloseRetryCoreTest(3, 0, true); + SequenceFileCloseRetryCoreTest(3, 1, true); + SequenceFileCloseRetryCoreTest(3, 5, true); + + SequenceFileCloseRetryCoreTest(3, 2, true); + SequenceFileCloseRetryCoreTest(3, 2, true); + + SequenceFileCloseRetryCoreTest(0, 0, true); + SequenceFileCloseRetryCoreTest(1, 0, true); + } + + + public void SequenceFileCloseRetryCoreTest(int numberOfCloseRetriesToAttempt, int numberOfClosesRequired, boolean throwExceptionsOfFailedClose) throws Exception { + String file = testFile.getCanonicalPath(); + HDFSSequenceFile stream = new HDFSSequenceFile(); + context.put("hdfs.useRawLocalFileSystem", "true"); + context.put("hdfs.closeTries", String.valueOf(numberOfCloseRetriesToAttempt)); + Configuration conf = new Configuration(); + Path dstPath = new Path(file); + MockFileSystemCloseRetryWrapper mockFs = new MockFileSystemCloseRetryWrapper(dstPath.getFileSystem(conf), numberOfClosesRequired, throwExceptionsOfFailedClose); + stream.configure(context); + stream.open(dstPath, null, CompressionType.NONE, conf, mockFs); + stream.append(event); + stream.sync(); + + stream.close(); + + if (throwExceptionsOfFailedClose) { + int expectedNumberOfCloses = 1; + Assert.assertTrue("Expected " + expectedNumberOfCloses + " but got " + mockFs.getLastMockOutputStream().getCurrentCloseAttempts() , mockFs.getLastMockOutputStream().currentCloseAttempts == expectedNumberOfCloses); + } else { + int expectedNumberOfCloses = Math.max(Math.min(numberOfClosesRequired, numberOfCloseRetriesToAttempt), 1); + Assert.assertTrue("Expected " + expectedNumberOfCloses + " but got " + mockFs.getLastMockOutputStream().getCurrentCloseAttempts() , mockFs.getLastMockOutputStream().currentCloseAttempts == expectedNumberOfCloses); + } + + + + + + } + + } \ No newline at end of file
