fixing TextReader resume abandoned file functionality. Added UT

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/721c9b3d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/721c9b3d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/721c9b3d

Branch: refs/heads/1.x-branch
Commit: 721c9b3d1a7a47cda19ca4867a584626c84823f4
Parents: 1e52f08
Author: Roshan Naik <[email protected]>
Authored: Tue Dec 22 19:24:42 2015 -0800
Committer: Roshan Naik <[email protected]>
Committed: Thu Jan 14 11:34:56 2016 -0800

----------------------------------------------------------------------
 .../org/apache/storm/hdfs/common/HdfsUtils.java |  1 +
 .../org/apache/storm/hdfs/spout/DirLock.java    |  9 +++-
 .../org/apache/storm/hdfs/spout/FileLock.java   | 34 +++++++-----
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  | 40 ++++++++------
 .../storm/hdfs/spout/SequenceFileReader.java    |  4 +-
 .../apache/storm/hdfs/spout/TextFileReader.java | 57 ++++++++++++--------
 .../apache/storm/hdfs/spout/TestFileLock.java   |  2 +-
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  | 51 +++++++++++++++++-
 .../storm/hdfs/spout/TestProgressTracker.java   |  2 +-
 9 files changed, 144 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
index 86b9ee8..5ec5333 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.ipc.RemoteException;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
index 06ca749..0e1182f 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
@@ -21,6 +21,7 @@ package org.apache.storm.hdfs.spout;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.storm.hdfs.common.HdfsUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,8 +102,14 @@ public class DirLock {
     }
   }
 
-
   private static DirLock takeOwnership(FileSystem fs, Path dirLockFile) throws 
IOException {
+    if(fs instanceof DistributedFileSystem) {
+      if (!((DistributedFileSystem) fs).recoverLease(dirLockFile)) {
+        log.warn("Unable to recover lease on dir lock file " + dirLockFile + " 
right now. Cannot transfer ownership. Will need to try later.");
+        return null;
+      }
+    }
+
     // delete and recreate lock file
     if( fs.delete(dirLockFile, false) ) { // returns false if somebody else 
already deleted it (to take ownership)
       FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, dirLockFile);

http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
index 89ed855..c64336d 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.storm.hdfs.common.HdfsUtils;
@@ -65,7 +66,7 @@ public class FileLock {
     this.lockFile = lockFile;
     this.lockFileStream =  fs.append(lockFile);
     this.componentID = spoutId;
-    log.debug("Acquired abandoned lockFile {}", lockFile);
+    log.debug("Acquired abandoned lockFile {}, Spout {}", lockFile, spoutId);
     logProgress(entry.fileOffset, true);
   }
 
@@ -95,13 +96,13 @@ public class FileLock {
   public void release() throws IOException {
     lockFileStream.close();
     if(!fs.delete(lockFile, false)){
-      log.warn("Unable to delete lock file");
+      log.warn("Unable to delete lock file, Spout = {}", componentID);
       throw new IOException("Unable to delete lock file");
     }
-    log.debug("Released lock file {}", lockFile);
+    log.debug("Released lock file {}. Spout {}", lockFile, componentID);
   }
 
-  // for testing only.. invoked via reflection
+  // For testing only.. invoked via reflection
   private void forceCloseLockFile() throws IOException {
     lockFileStream.close();
   }
@@ -115,14 +116,14 @@ public class FileLock {
     try {
       FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
       if (ostream != null) {
-        log.debug("Acquired lock on file {}. LockFile=", fileToLock, lockFile);
+        log.debug("Acquired lock on file {}. LockFile= {}, Spout = {}", 
fileToLock, lockFile, spoutId);
         return new FileLock(fs, lockFile, ostream, spoutId);
       } else {
-        log.debug("Cannot lock file {} as its already locked.", fileToLock);
+        log.debug("Cannot lock file {} as its already locked. Spout = {}", 
fileToLock, spoutId);
         return null;
       }
     } catch (IOException e) {
-      log.error("Error when acquiring lock on file " + fileToLock, e);
+      log.error("Error when acquiring lock on file " + fileToLock + " Spout = 
" + spoutId, e);
       throw e;
     }
   }
@@ -147,7 +148,6 @@ public class FileLock {
       if(lastEntry==null) {
         throw new RuntimeException(lockFile.getName() + " is empty. this file 
is invalid.");
       }
-      log.error("{} , lastModified= {},  expiryTime= {},  diff= {}", lockFile, 
lastEntry.eventTime, olderThan,  lastEntry.eventTime-olderThan );
       if( lastEntry.eventTime <= olderThan )
         return lastEntry;
     }
@@ -181,17 +181,25 @@ public class FileLock {
    *                    file is stale. so this value comes from that earlier 
scan.
    * @param spoutId     spout id
    * @throws IOException if unable to acquire
-   * @return null if lock File is being used by another thread
+   * @return null if lock File is not recoverable
    */
   public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry 
lastEntry, String spoutId)
           throws IOException {
     try {
+      if(fs instanceof DistributedFileSystem ) {
+        if( !((DistributedFileSystem) fs).recoverLease(lockFile) ) {
+          log.warn("Unable to recover lease on lock file {} right now. Cannot 
transfer ownership. Will need to try later. Spout = {}" , lockFile , spoutId);
+          return null;
+        }
+      }
       return new FileLock(fs, lockFile, spoutId, lastEntry);
-    } catch (RemoteException e) {
-      if (e.unwrapRemoteException() instanceof AlreadyBeingCreatedException) {
-        log.warn("Lock file {} is currently open. Cannot transfer ownership 
now. Will try later.", lockFile);
+    } catch (IOException e) {
+      if (e instanceof RemoteException &&
+              ((RemoteException) e).unwrapRemoteException() instanceof 
AlreadyBeingCreatedException) {
+        log.warn("Lock file " + lockFile  + "is currently open. Cannot 
transfer ownership now. Will need to try later. Spout= " + spoutId, e);
         return null;
       } else { // unexpected error
+        log.warn("Cannot transfer ownership now for lock file " + lockFile + 
". Will need to try later. Spout =" + spoutId, e);
         throw e;
       }
     }
@@ -226,7 +234,7 @@ public class FileLock {
       }
     }
     if(listing.isEmpty())
-      log.info("No abandoned lock files found");
+      log.info("No abandoned lock files found by Spout {}", spoutId);
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 3d95ea7..5a6adf8 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -107,7 +107,7 @@ public class HdfsSpout extends BaseRichSpout {
   }
 
   public void nextTuple() {
-    LOG.debug("Next Tuple");
+    LOG.debug("Next Tuple {}", spoutId);
     // 1) First re-emit any previously failed tuples (from retryList)
     if (!retryList.isEmpty()) {
       LOG.debug("Sending from retry list");
@@ -118,8 +118,8 @@ public class HdfsSpout extends BaseRichSpout {
 
     if( ackEnabled  &&  tracker.size()>=maxDuplicates ) {
       LOG.warn("Waiting for more ACKs before generating new tuples. " +
-               "Progress tracker size has reached limit {}"
-              , maxDuplicates);
+               "Progress tracker size has reached limit {}, SpoutID {}"
+              , maxDuplicates, spoutId);
       // Don't emit anything .. allow configured spout wait strategy to kick in
       return;
     }
@@ -172,8 +172,7 @@ public class HdfsSpout extends BaseRichSpout {
         // spout wait strategy (due to no emits). Instead we go back into the 
loop and
         // generate a tuple from next file
       }
-    }
-
+    } // while
   }
 
   // will commit progress into lock file if commit threshold is reached
@@ -187,7 +186,7 @@ public class HdfsSpout extends BaseRichSpout {
         commitTimeElapsed.set(false);
         setupCommitElapseTimer();
       } catch (IOException e) {
-        LOG.error("Unable to commit progress Will retry later.", e);
+        LOG.error("Unable to commit progress Will retry later. Spout ID = " + 
spoutId, e);
       }
     }
   }
@@ -212,9 +211,9 @@ public class HdfsSpout extends BaseRichSpout {
   private void markFileAsDone(Path filePath) {
     try {
       Path newFile = renameCompletedFile(reader.getFilePath());
-      LOG.info("Completed processing {}", newFile);
+      LOG.info("Completed processing {}. Spout Id = {} ", newFile, spoutId);
     } catch (IOException e) {
-      LOG.error("Unable to archive completed file" + filePath, e);
+      LOG.error("Unable to archive completed file" + filePath + " Spout ID " + 
spoutId, e);
     }
     closeReaderAndResetTrackers();
   }
@@ -225,13 +224,13 @@ public class HdfsSpout extends BaseRichSpout {
     String originalName = new Path(fileNameMinusSuffix).getName();
     Path  newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName);
 
-    LOG.info("Moving bad file {} to {}. Processed it till offset {}", 
originalName, newFile, tracker.getCommitPosition());
+    LOG.info("Moving bad file {} to {}. Processed it till offset {}. SpoutID= 
{}", originalName, newFile, tracker.getCommitPosition(), spoutId);
     try {
       if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning 
false or throwing exception
         throw new IOException("Move failed for bad file: " + file); // convert 
false ret value to exception
       }
     } catch (IOException e) {
-      LOG.warn("Error moving bad file: " + file + " to destination " + 
newFile, e);
+      LOG.warn("Error moving bad file: " + file + " to destination " + newFile 
+ " SpoutId =" + spoutId, e);
     }
     closeReaderAndResetTrackers();
   }
@@ -245,8 +244,9 @@ public class HdfsSpout extends BaseRichSpout {
     reader = null;
     try {
       lock.release();
+      LOG.debug("Spout {} released FileLock. SpoutId = {}", 
lock.getLockFile(), spoutId);
     } catch (IOException e) {
-      LOG.error("Unable to delete lock file : " + this.lock.getLockFile(), e);
+      LOG.error("Unable to delete lock file : " + this.lock.getLockFile() + " 
SpoutId =" + spoutId, e);
     }
     lock = null;
   }
@@ -260,7 +260,7 @@ public class HdfsSpout extends BaseRichSpout {
   public void open(Map conf, TopologyContext context,  SpoutOutputCollector 
collector) {
     this.conf = conf;
     final String FILE_SYSTEM = "filesystem";
-    LOG.info("Opening HDFS Spout");
+    LOG.info("Opening HDFS Spout {}", spoutId);
     this.collector = collector;
     this.hdfsConfig = new Configuration();
     this.tupleCounter = 0;
@@ -437,6 +437,7 @@ public class HdfsSpout extends BaseRichSpout {
       // 1) If there are any abandoned files, pick oldest one
       lock = getOldestExpiredLock();
       if (lock != null) {
+        LOG.debug("Spout {} now took over ownership of abandoned FileLock {}" 
, spoutId, lock.getLockFile());
         Path file = getFileForLockFile(lock.getLockFile(), sourceDirPath);
         String resumeFromOffset = lock.getLastLogEntry().fileOffset;
         LOG.info("Resuming processing of abandoned file : {}", file);
@@ -454,7 +455,7 @@ public class HdfsSpout extends BaseRichSpout {
 
         lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId);
         if( lock==null ) {
-          LOG.debug("Unable to get lock, so skipping file: {}", file);
+          LOG.debug("Unable to get FileLock, so skipping file: {}", file);
           continue; // could not lock, so try another file.
         }
         LOG.info("Processing : {} ", file);
@@ -480,9 +481,15 @@ public class HdfsSpout extends BaseRichSpout {
     DirLock dirlock = DirLock.tryLock(hdfs, lockDirPath);
     if (dirlock == null) {
       dirlock = DirLock.takeOwnershipIfStale(hdfs, lockDirPath, 
lockTimeoutSec);
-      if (dirlock == null)
+      if (dirlock == null) {
+        LOG.debug("Spout {} could not take over ownership of DirLock for {}" , 
spoutId, lockDirPath);
         return null;
+      }
+      LOG.debug("Spout {} now took over ownership of abandoned DirLock for {}" 
, spoutId, lockDirPath);
+    } else {
+      LOG.debug("Spout {} now owns DirLock for {}", spoutId, lockDirPath);
     }
+
     try {
       // 2 - if clocks are in sync then simply take ownership of the oldest 
expired lock
       if (clocksInSync)
@@ -512,6 +519,7 @@ public class HdfsSpout extends BaseRichSpout {
       }
     } finally {
       dirlock.release();
+      LOG.debug("Released DirLock {}, SpoutID {} ", dirlock.getLockFile(), 
spoutId);
     }
   }
 
@@ -583,10 +591,10 @@ public class HdfsSpout extends BaseRichSpout {
   private Path getFileForLockFile(Path lockFile, Path sourceDirPath)
           throws IOException {
     String lockFileName = lockFile.getName();
-    Path dataFile = new Path(sourceDirPath + lockFileName + inprogress_suffix);
+    Path dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName + 
inprogress_suffix);
     if( hdfs.exists(dataFile) )
       return dataFile;
-    dataFile = new Path(sourceDirPath + lockFileName);
+    dataFile = new Path(sourceDirPath + Path.SEPARATOR +  lockFileName);
     if(hdfs.exists(dataFile))
       return dataFile;
     return null;

http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
index 5ff7b75..5edb4e5 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
@@ -150,6 +150,8 @@ public class SequenceFileReader<Key extends Writable,Value 
extends Writable>
 
     public Offset(String offset) {
       try {
+        if(offset==null)
+          throw new IllegalArgumentException("offset cannot be null");
         String[] parts = offset.split(",");
         this.lastSyncPoint = Long.parseLong(parts[0].split("=")[1]);
         this.recordsSinceLastSync = Long.parseLong(parts[1].split("=")[1]);
@@ -169,7 +171,7 @@ public class SequenceFileReader<Key extends Writable,Value 
extends Writable>
               "sync=" + lastSyncPoint +
               ":afterSync=" + recordsSinceLastSync +
               ":record=" + currentRecord +
-              '}';
+              ":}";
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
index b998d30..cf04710 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
@@ -53,16 +53,18 @@ class TextFileReader extends AbstractFileReader {
     this(fs, file, conf, new TextFileReader.Offset(startOffset) );
   }
 
-  private TextFileReader(FileSystem fs, Path file, Map conf, 
TextFileReader.Offset startOffset) throws IOException {
+  private TextFileReader(FileSystem fs, Path file, Map conf, 
TextFileReader.Offset startOffset)
+          throws IOException {
     super(fs, file, new Fields(DEFAULT_FIELD_NAME));
     offset = startOffset;
     FSDataInputStream in = fs.open(file);
-    if(offset.byteOffset>0)
-      in.seek(offset.byteOffset);
 
     String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : 
conf.get(CHARSET).toString();
     int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? 
DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
     reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz);
+    if(offset.charOffset >0)
+      reader.skip(offset.charOffset);
+
   }
 
   public Offset getFileOffset() {
@@ -70,15 +72,31 @@ class TextFileReader extends AbstractFileReader {
   }
 
   public List<Object> next() throws IOException, ParseException {
-    String line =  reader.readLine();
+    String line = readLineAndTrackOffset(reader);
     if(line!=null) {
-      int strByteSize = line.getBytes().length;
-      offset.increment(strByteSize);
       return Collections.singletonList((Object) line);
     }
     return null;
   }
 
+  private String readLineAndTrackOffset(BufferedReader reader) throws 
IOException {
+    StringBuffer sb = new StringBuffer(1000);
+    long before = offset.charOffset;
+    int ch;
+    while( (ch = reader.read()) != -1 ) {
+      ++offset.charOffset;
+      if (ch == '\n') {
+        ++offset.lineNumber;
+        return sb.toString();
+      } else if( ch != '\r') {
+        sb.append((char)ch);
+      }
+    }
+    if(before==offset.charOffset) // reached EOF, didnt read anything
+      return null;
+    return sb.toString();
+  }
+
   @Override
   public void close() {
     try {
@@ -89,41 +107,41 @@ class TextFileReader extends AbstractFileReader {
   }
 
   public static class Offset implements FileOffset {
-    long byteOffset;
+    long charOffset;
     long lineNumber;
 
     public Offset(long byteOffset, long lineNumber) {
-      this.byteOffset = byteOffset;
+      this.charOffset = byteOffset;
       this.lineNumber = lineNumber;
     }
 
     public Offset(String offset) {
-      if(offset!=null)
+      if(offset==null)
         throw new IllegalArgumentException("offset cannot be null");
       try {
         String[] parts = offset.split(":");
-        this.byteOffset = Long.parseLong(parts[0].split("=")[1]);
+        this.charOffset = Long.parseLong(parts[0].split("=")[1]);
         this.lineNumber = Long.parseLong(parts[1].split("=")[1]);
       } catch (Exception e) {
         throw new IllegalArgumentException("'" + offset +
                 "' cannot be interpreted. It is not in expected format for 
TextFileReader." +
-                " Format e.g.  {byte=123:line=5}");
+                " Format e.g.  {char=123:line=5}");
       }
     }
 
     @Override
     public String toString() {
       return '{' +
-              "byte=" + byteOffset +
+              "char=" + charOffset +
               ":line=" + lineNumber +
-              '}';
+              ":}";
     }
 
     @Override
     public boolean isNextOffset(FileOffset rhs) {
       if(rhs instanceof Offset) {
         Offset other = ((Offset) rhs);
-        return  other.byteOffset > byteOffset    &&
+        return  other.charOffset > charOffset &&
                 other.lineNumber == lineNumber+1;
       }
       return false;
@@ -146,26 +164,21 @@ class TextFileReader extends AbstractFileReader {
 
       Offset that = (Offset) o;
 
-      if (byteOffset != that.byteOffset)
+      if (charOffset != that.charOffset)
         return false;
       return lineNumber == that.lineNumber;
     }
 
     @Override
     public int hashCode() {
-      int result = (int) (byteOffset ^ (byteOffset >>> 32));
+      int result = (int) (charOffset ^ (charOffset >>> 32));
       result = 31 * result + (int) (lineNumber ^ (lineNumber >>> 32));
       return result;
     }
 
-    void increment(int delta) {
-      ++lineNumber;
-      byteOffset += delta;
-    }
-
     @Override
     public Offset clone() {
-      return new Offset(byteOffset, lineNumber);
+      return new Offset(charOffset, lineNumber);
     }
   } //class Offset
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
index a97b3f2..7995248 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
@@ -314,7 +314,7 @@ public class TestFileLock {
     }
   }
 
-  private void closeUnderlyingLockFile(FileLock lock) throws 
ReflectiveOperationException {
+  public static void closeUnderlyingLockFile(FileLock lock) throws 
ReflectiveOperationException {
     Method m = FileLock.class.getDeclaredMethod("forceCloseLockFile");
     m.setAccessible(true);
     m.invoke(lock);

http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index f64400a..1279f06 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -21,6 +21,7 @@ package org.apache.storm.hdfs.spout;
 import backtype.storm.Config;
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -74,7 +75,7 @@ public class TestHdfsSpout {
 
   static MiniDFSCluster.Builder builder;
   static MiniDFSCluster hdfsCluster;
-  static FileSystem fs;
+  static DistributedFileSystem fs;
   static String hdfsURI;
   static Configuration conf = new Configuration();
 
@@ -156,6 +157,49 @@ public class TestHdfsSpout {
     checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2);
   }
 
+  @Test
+  public void testResumeAbandoned_Text_NoAck() throws Exception {
+    Path file1 = new Path(source.toString() + "/file1.txt");
+    createTextFile(file1, 6);
+
+    final Integer lockExpirySec = 1;
+    Map conf = getDefaultConfig();
+    conf.put(Configs.COMMIT_FREQ_COUNT, "1");
+    conf.put(Configs.COMMIT_FREQ_SEC, "1000"); // basically disable it
+    conf.put(Configs.LOCK_TIMEOUT, lockExpirySec.toString());
+    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+    HdfsSpout spout2 = makeSpout(1, conf, Configs.TEXT);
+
+    // consume file 1 partially
+    List<String> res = runSpout(spout, "r2");
+    Assert.assertEquals(2, res.size());
+    // abandon file
+    FileLock lock = getField(spout, "lock");
+    TestFileLock.closeUnderlyingLockFile(lock);
+    Thread.sleep(lockExpirySec * 2 * 1000);
+
+    // check lock file presence
+    Assert.assertTrue(fs.exists(lock.getLockFile()));
+
+    // create another spout to take over processing and read a few lines
+    List<String> res2 = runSpout(spout2, "r3");
+    Assert.assertEquals(3, res2.size());
+
+    // check lock file presence
+    Assert.assertTrue(fs.exists(lock.getLockFile()));
+
+    // check lock file contents
+    List<String> contents = readTextFile(fs, lock.getLockFile().toString());
+    System.err.println(contents);
+
+    // finish up reading the file
+    res2 = runSpout(spout2, "r2");
+    Assert.assertEquals(4, res2.size());
+
+    // check lock file is gone
+    Assert.assertFalse(fs.exists(lock.getLockFile()));
+  }
+
   private void checkCollectorOutput_txt(MockCollector collector, Path... 
txtFiles) throws IOException {
     ArrayList<String> expected = new ArrayList<>();
     for (Path txtFile : txtFiles) {
@@ -183,6 +227,7 @@ public class TestHdfsSpout {
     return result;
   }
 
+
   private void checkCollectorOutput_seq(MockCollector collector, Path... 
seqFiles) throws IOException {
     ArrayList<String> expected = new ArrayList<>();
     for (Path seqFile : seqFiles) {
@@ -515,8 +560,12 @@ public class TestHdfsSpout {
 
   private void createTextFile(Path file, int lineCount) throws IOException {
     FSDataOutputStream os = fs.create(file);
+    int size = 0;
     for (int i = 0; i < lineCount; i++) {
       os.writeBytes("line " + i + System.lineSeparator());
+      String msg = "line " + i + System.lineSeparator();
+      System.err.print(size +  "-" + msg);
+      size += msg.getBytes().length;
     }
     os.close();
   }

http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
index 59aad25..0bb44af 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
@@ -60,7 +60,7 @@ public class TestProgressTracker {
 
     TextFileReader.Offset currOffset = reader.getFileOffset();
     Assert.assertNotNull(currOffset);
-    Assert.assertEquals(0, currOffset.byteOffset);
+    Assert.assertEquals(0, currOffset.charOffset);
 
     // read 1st line and ack
     Assert.assertNotNull(reader.next());

Reply via email to