Addressing another review comment from Arun about releasing lock file on exception.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2c02bc91 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2c02bc91 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2c02bc91 Branch: refs/heads/1.x-branch Commit: 2c02bc91d8a9b81b55a4e023c927a73068bcc927 Parents: 0b07f8b Author: Roshan Naik <[email protected]> Authored: Thu Jan 7 19:23:19 2016 -0800 Committer: Roshan Naik <[email protected]> Committed: Thu Jan 14 11:34:57 2016 -0800 ---------------------------------------------------------------------- .../org/apache/storm/hdfs/spout/HdfsSpout.java | 45 +++++++++++++------- 1 file changed, 30 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/2c02bc91/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index 994d87e..5428570 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -508,9 +508,16 @@ public class HdfsSpout extends BaseRichSpout { LOG.debug("Unable to get FileLock for {}, so skipping it.", file); continue; // could not lock, so try another file. } - LOG.info("Processing : {} ", file); - Path newFile = renameToInProgressFile(file); - return createFileReader(newFile); + try { + Path newFile = renameToInProgressFile(file); + FileReader result = createFileReader(newFile); + LOG.info("Processing : {} ", file); + return result; + } catch (Exception e) { + LOG.error("Skipping file " + file, e); + releaseLockAndLog(lock, spoutId); + continue; + } } return null; @@ -599,7 +606,7 @@ public class HdfsSpout extends BaseRichSpout { return (FileReader) constructor.newInstance(this.hdfs, file, conf); } catch (Exception e) { LOG.error(e.getMessage(), e); - throw new RuntimeException("Unable to instantiate " + readerType, e); + throw new RuntimeException("Unable to instantiate " + readerType + " reader", e); } } @@ -638,10 +645,14 @@ public class HdfsSpout extends BaseRichSpout { private Path renameToInProgressFile(Path file) throws IOException { Path newFile = new Path( file.toString() + inprogress_suffix ); - if (hdfs.rename(file, newFile)) { - return newFile; + try { + if (hdfs.rename(file, newFile)) { + return newFile; + } + throw new RenameException(file, newFile); + } catch (IOException e){ + throw new RenameException(file, newFile, e); } - throw new IOException("Rename of " + file + " to " + newFile + " failed"); } /** Returns the corresponding input file in the 'sourceDirPath' for the specified lock file. @@ -709,16 +720,20 @@ public class HdfsSpout extends BaseRichSpout { } } - private static class RenameFailedException extends IOException { - public final Path file; - public RenameFailedException(Path file) { - super("Rename failed for file: " + file); - this.file = file; + private static class RenameException extends IOException { + public final Path oldFile; + public final Path newFile; + + public RenameException(Path oldFile, Path newFile) { + super("Rename of " + oldFile + " to " + newFile + " failed"); + this.oldFile = oldFile; + this.newFile = newFile; } - public RenameFailedException(Path file, IOException e) { - super("Rename failed for file: " + file, e); - this.file = file; + public RenameException(Path oldFile, Path newFile, IOException cause) { + super("Rename of " + oldFile + " to " + newFile + " failed", cause); + this.oldFile = oldFile; + this.newFile = newFile; } } }
