fixing SeqFileReader resume behavior for abandoned files. Added UT

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e50b639a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e50b639a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e50b639a

Branch: refs/heads/1.x-branch
Commit: e50b639add0cba65dc02b91553af6a9a4e4e5295
Parents: 721c9b3
Author: Roshan Naik <[email protected]>
Authored: Tue Dec 22 20:01:35 2015 -0800
Committer: Roshan Naik <[email protected]>
Committed: Thu Jan 14 11:34:56 2016 -0800

----------------------------------------------------------------------
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  |  3 +-
 .../storm/hdfs/spout/SequenceFileReader.java    | 35 ++++++++++----
 .../apache/storm/hdfs/spout/TextFileReader.java | 11 +++--
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  | 50 ++++++++++++++++++++
 4 files changed, 85 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e50b639a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 5a6adf8..fdb48b4 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -130,10 +130,11 @@ public class HdfsSpout extends BaseRichSpout {
         // 3) Select a new file if one is not open already
         if (reader == null) {
           reader = pickNextFile();
-          fileReadCompletely=false;
           if (reader == null) {
             LOG.debug("Currently no new files to process under : " + 
sourceDirPath);
             return;
+          } else {
+            fileReadCompletely=false;
           }
         }
         if( fileReadCompletely ) { // wait for more ACKs before proceeding

http://git-wip-us.apache.org/repos/asf/storm/blob/e50b639a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
index 5edb4e5..308d1c6 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
@@ -76,9 +76,16 @@ public class SequenceFileReader<Key extends Writable,Value 
extends Writable>
     int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : 
Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
     this.offset = new SequenceFileReader.Offset(offset);
     this.reader = new SequenceFile.Reader(fs.getConf(),  
SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) );
-    this.reader.sync(this.offset.lastSyncPoint);
     this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), 
fs.getConf() );
     this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), 
fs.getConf() );
+    skipToOffset(this.reader, this.offset, this.key);
+  }
+
+  private static <K> void skipToOffset(SequenceFile.Reader reader, Offset 
offset, K key) throws IOException {
+    reader.sync(offset.lastSyncPoint);
+    for(int i=0; i<offset.recordsSinceLastSync; ++i) {
+      reader.next(key);
+    }
   }
 
   public String getKeyName() {
@@ -129,9 +136,9 @@ public class SequenceFileReader<Key extends Writable,Value 
extends Writable>
 
 
   public static class Offset implements  FileOffset {
-    private long lastSyncPoint;
-    private long recordsSinceLastSync;
-    private long currentRecord;
+    public long lastSyncPoint;
+    public long recordsSinceLastSync;
+    public long currentRecord;
     private long currRecordEndOffset;
     private long prevRecordEndOffset;
 
@@ -152,12 +159,20 @@ public class SequenceFileReader<Key extends 
Writable,Value extends Writable>
       try {
         if(offset==null)
           throw new IllegalArgumentException("offset cannot be null");
-        String[] parts = offset.split(",");
-        this.lastSyncPoint = Long.parseLong(parts[0].split("=")[1]);
-        this.recordsSinceLastSync = Long.parseLong(parts[1].split("=")[1]);
-        this.currentRecord = Long.parseLong(parts[2].split("=")[1]);
-        this.prevRecordEndOffset = 0;
-        this.currRecordEndOffset = 0;
+        if(offset.equalsIgnoreCase("0")) {
+          this.lastSyncPoint = 0;
+          this.recordsSinceLastSync = 0;
+          this.currentRecord = 0;
+          this.prevRecordEndOffset = 0;
+          this.currRecordEndOffset = 0;
+        } else {
+          String[] parts = offset.split(":");
+          this.lastSyncPoint = Long.parseLong(parts[0].split("=")[1]);
+          this.recordsSinceLastSync = Long.parseLong(parts[1].split("=")[1]);
+          this.currentRecord = Long.parseLong(parts[2].split("=")[1]);
+          this.prevRecordEndOffset = 0;
+          this.currRecordEndOffset = 0;
+        }
       } catch (Exception e) {
         throw new IllegalArgumentException("'" + offset +
                 "' cannot be interpreted. It is not in expected format for 
SequenceFileReader." +

http://git-wip-us.apache.org/repos/asf/storm/blob/e50b639a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
index cf04710..fdea42a 100644
--- 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
+++ 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
@@ -119,9 +119,14 @@ class TextFileReader extends AbstractFileReader {
       if(offset==null)
         throw new IllegalArgumentException("offset cannot be null");
       try {
-        String[] parts = offset.split(":");
-        this.charOffset = Long.parseLong(parts[0].split("=")[1]);
-        this.lineNumber = Long.parseLong(parts[1].split("=")[1]);
+        if(offset.equalsIgnoreCase("0")) {
+          this.charOffset = 0;
+          this.lineNumber = 0;
+        } else {
+          String[] parts = offset.split(":");
+          this.charOffset = Long.parseLong(parts[0].split("=")[1]);
+          this.lineNumber = Long.parseLong(parts[1].split("=")[1]);
+        }
       } catch (Exception e) {
         throw new IllegalArgumentException("'" + offset +
                 "' cannot be interpreted. It is not in expected format for 
TextFileReader." +

http://git-wip-us.apache.org/repos/asf/storm/blob/e50b639a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 1279f06..203a63b 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -198,6 +198,56 @@ public class TestHdfsSpout {
 
     // check lock file is gone
     Assert.assertFalse(fs.exists(lock.getLockFile()));
+    FileReader rdr = getField(spout2, "reader");
+    Assert.assertNull(rdr);
+    Assert.assertTrue(getBoolField(spout2, "fileReadCompletely"));
+
+  }
+
+  @Test
+  public void testResumeAbandoned_Seq_NoAck() throws Exception {
+    Path file1 = new Path(source.toString() + "/file1.seq");
+    createSeqFile(fs, file1, 6);
+
+    final Integer lockExpirySec = 1;
+    Map conf = getDefaultConfig();
+    conf.put(Configs.COMMIT_FREQ_COUNT, "1");
+    conf.put(Configs.COMMIT_FREQ_SEC, "1000"); // basically disable it
+    conf.put(Configs.LOCK_TIMEOUT, lockExpirySec.toString());
+    HdfsSpout spout = makeSpout(0, conf, Configs.SEQ);
+    HdfsSpout spout2 = makeSpout(1, conf, Configs.SEQ);
+
+    // consume file 1 partially
+    List<String> res = runSpout(spout, "r2");
+    Assert.assertEquals(2, res.size());
+    // abandon file
+    FileLock lock = getField(spout, "lock");
+    TestFileLock.closeUnderlyingLockFile(lock);
+    Thread.sleep(lockExpirySec * 2 * 1000);
+
+    // check lock file presence
+    Assert.assertTrue(fs.exists(lock.getLockFile()));
+
+    // create another spout to take over processing and read a few lines
+    List<String> res2 = runSpout(spout2, "r3");
+    Assert.assertEquals(3, res2.size());
+
+    // check lock file presence
+    Assert.assertTrue(fs.exists(lock.getLockFile()));
+
+    // check lock file contents
+    List<String> contents = getTextFileContents(fs, lock.getLockFile());
+    System.err.println(contents);
+
+    // finish up reading the file
+    res2 = runSpout(spout2, "r3");
+    Assert.assertEquals(4, res2.size());
+
+    // check lock file is gone
+    Assert.assertFalse(fs.exists(lock.getLockFile()));
+    FileReader rdr = getField(spout2, "reader");
+    Assert.assertNull( rdr );
+    Assert.assertTrue(getBoolField(spout2, "fileReadCompletely"));
   }
 
   private void checkCollectorOutput_txt(MockCollector collector, Path... 
txtFiles) throws IOException {

Reply via email to