Addressing review comments from Arun.

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0b07f8b3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0b07f8b3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0b07f8b3

Branch: refs/heads/1.x-branch
Commit: 0b07f8b3a8a458f39cc9f64be1e5623b0a6815d2
Parents: b5240a7
Author: Roshan Naik <[email protected]>
Authored: Thu Jan 7 16:25:45 2016 -0800
Committer: Roshan Naik <[email protected]>
Committed: Thu Jan 14 11:34:56 2016 -0800

----------------------------------------------------------------------
 external/storm-hdfs/README.md                   | 10 ++--
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  | 50 ++++++++++++++------
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  |  1 -
 3 files changed, 41 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0b07f8b3/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index 8008bd8..bf63ad9 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -452,9 +452,9 @@ before selecting the next file for consumption.
 **Lock on *.lock* Directory**
 Hdfs spout instances create a *DIRLOCK* file in the .lock directory to 
co-ordinate certain accesses to 
 the .lock dir itself. A spout will try to create it when it needs access to 
the .lock directory and
-then delete it when done.  In case of a topology crash or force kill, this 
file may not get deleted.
-In this case it should be deleted manually to allow the new topology instance 
to regain  full access 
-to the  .lock  directory and resume normal processing. 
+then delete it when done.  In error conditions such as a topology crash, force 
kill or untimely death 
+of a spout, this file may not get deleted. Future running instances of the 
spout will eventually recover
+this once the DIRLOCK file becomes stale due to inactivity for 
hdfsspout.lock.timeout.sec seconds.
 
 ## Usage
 
@@ -515,13 +515,13 @@ Only settings mentioned in **bold** are required.
 |**hdfsspout.source.dir**      |             | HDFS location from where to 
read.  E.g. /data/inputfiles  |
 |**hdfsspout.archive.dir**     |             | After a file is processed 
completely it will be moved to this directory. E.g. /data/done|
 |**hdfsspout.badfiles.dir**    |             | if there is an error parsing a 
file's contents, the file is moved to this location.  E.g. /data/badfiles  |
-|hdfsspout.lock.dir            | '.lock' subdirectory under 
hdfsspout.source.dir | Dir in which lock files will be created. Concurrent HDFS 
spout instances synchronize using *lock* files. Before processing a file the 
spout instance creates a lock file in this directory with same name as input 
file and deletes this lock file after processing the file. Spout also 
periodically makes a note of its progress (wrt reading the input file) in the 
lock file so that another spout instance can resume progress on the same file 
if the spout dies for any reason. When a toplogy is killed, if a .lock/DIRLOCK 
file is left behind it can be safely deleted to allow normal resumption of the 
topology on restart.|
+|hdfsspout.lock.dir            | '.lock' subdirectory under 
hdfsspout.source.dir | Dir in which lock files will be created. Concurrent HDFS 
spout instances synchronize using *lock* files. Before processing a file the 
spout instance creates a lock file in this directory with same name as input 
file and deletes this lock file after processing the file. Spouts also 
periodically makes a note of their progress (wrt reading the input file) in the 
lock file so that another spout instance can resume progress on the same file 
if the spout dies for any reason.|
 |hdfsspout.ignore.suffix       |   .ignore   | File names with this suffix in 
the in the hdfsspout.source.dir location will not be processed|
 |hdfsspout.commit.count        |    20000    | Record progress in the lock 
file after these many records are processed. If set to 0, this criterion will 
not be used. |
 |hdfsspout.commit.sec          |    10       | Record progress in the lock 
file after these many seconds have elapsed. Must be greater than 0 |
 |hdfsspout.max.outstanding     |   10000     | Limits the number of unACKed 
tuples by pausing tuple generation (if ACKers are used in the topology) |
 |hdfsspout.lock.timeout.sec    |  5 minutes  | Duration of inactivity after 
which a lock file is considered to be abandoned and ready for another spout to 
take ownership |
-|hdfsspout.clocks.insync       |    true     | Indicates whether clocks on the 
storm machines are in sync (using services like NTP)       |
+|hdfsspout.clocks.insync       |    true     | Indicates whether clocks on the 
storm machines are in sync (using services like NTP). Used for detecting stale 
locks. |
 |hdfs.config (unless changed)  |             | Set it to a Map of Key/value 
pairs indicating the HDFS settigns to be used. For example, keytab and 
principle could be set using this. See section **Using keytabs on all worker 
hosts** under HDFS bolt below.| 
 
 ---

http://git-wip-us.apache.org/repos/asf/storm/blob/0b07f8b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 93d08d5..994d87e 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -258,13 +258,19 @@ public class HdfsSpout extends BaseRichSpout {
 
     reader.close();
     reader = null;
+    releaseLockAndLog(lock, spoutId);
+    lock = null;
+  }
+
+  private static void releaseLockAndLog(FileLock fLock, String spoutId) {
     try {
-      lock.release();
-      LOG.debug("Spout {} released FileLock. SpoutId = {}", 
lock.getLockFile(), spoutId);
+      if(fLock!=null) {
+        fLock.release();
+        LOG.debug("Spout {} released FileLock. SpoutId = {}", 
fLock.getLockFile(), spoutId);
+      }
     } catch (IOException e) {
-      LOG.error("Unable to delete lock file : " + this.lock.getLockFile() + " 
SpoutId =" + spoutId, e);
+      LOG.error("Unable to delete lock file : " +fLock.getLockFile() + " 
SpoutId =" + spoutId, e);
     }
-    lock = null;
   }
 
   protected void emitData(List<Object> tuple, MessageId id) {
@@ -475,7 +481,7 @@ public class HdfsSpout extends BaseRichSpout {
     }
   }
 
-  private FileReader pickNextFile()  {
+  private FileReader pickNextFile() {
     try {
       // 1) If there are any abandoned files, pick oldest one
       lock = getOldestExpiredLock();
@@ -491,19 +497,19 @@ public class HdfsSpout extends BaseRichSpout {
       Collection<Path> listing = HdfsUtils.listFilesByModificationTime(hdfs, 
sourceDirPath, 0);
 
       for (Path file : listing) {
-        if( file.getName().endsWith(inprogress_suffix) ) {
+        if (file.getName().endsWith(inprogress_suffix)) {
           continue;
         }
-        if( file.getName().endsWith(ignoreSuffix) ) {
+        if (file.getName().endsWith(ignoreSuffix)) {
           continue;
         }
         lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId);
-        if( lock==null ) {
+        if (lock == null) {
           LOG.debug("Unable to get FileLock for {}, so skipping it.", file);
           continue; // could not lock, so try another file.
         }
         LOG.info("Processing : {} ", file);
-        Path newFile = renameSelectedFile(file);
+        Path newFile = renameToInProgressFile(file);
         return createFileReader(newFile);
       }
 
@@ -624,14 +630,18 @@ public class HdfsSpout extends BaseRichSpout {
     }
   }
 
-  // returns new path of renamed file
-  private Path renameSelectedFile(Path file)
+  /**
+   * Renames files with .inprogress suffix
+   * @return path of renamed file
+   * @throws if operation fails
+   */
+  private Path renameToInProgressFile(Path file)
           throws IOException {
     Path newFile =  new Path( file.toString() + inprogress_suffix );
-    if( ! hdfs.rename(file, newFile) ) {
-      throw new IOException("Rename failed for file: " + file);
+    if (hdfs.rename(file, newFile)) {
+      return newFile;
     }
-    return newFile;
+    throw new IOException("Rename of " + file + " to " + newFile + " failed");
   }
 
   /** Returns the corresponding input file in the 'sourceDirPath' for the 
specified lock file.
@@ -699,4 +709,16 @@ public class HdfsSpout extends BaseRichSpout {
     }
   }
 
+  private static class RenameFailedException extends IOException {
+    public final Path file;
+    public RenameFailedException(Path file) {
+      super("Rename failed for file: " + file);
+      this.file = file;
+    }
+
+    public RenameFailedException(Path file, IOException e) {
+      super("Rename failed for file: " + file, e);
+      this.file = file;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0b07f8b3/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 0412126..835a714 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -103,7 +103,6 @@ public class TestHdfsSpout {
     fs.mkdirs(archive);
     badfiles = new Path(baseFolder.toString() + "/bad");
     fs.mkdirs(badfiles);
-
   }
 
   @After

Reply via email to