fixing SeqFileReader resume behavior for abandoned files. Added UT
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e50b639a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e50b639a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e50b639a Branch: refs/heads/1.x-branch Commit: e50b639add0cba65dc02b91553af6a9a4e4e5295 Parents: 721c9b3 Author: Roshan Naik <[email protected]> Authored: Tue Dec 22 20:01:35 2015 -0800 Committer: Roshan Naik <[email protected]> Committed: Thu Jan 14 11:34:56 2016 -0800 ---------------------------------------------------------------------- .../org/apache/storm/hdfs/spout/HdfsSpout.java | 3 +- .../storm/hdfs/spout/SequenceFileReader.java | 35 ++++++++++---- .../apache/storm/hdfs/spout/TextFileReader.java | 11 +++-- .../apache/storm/hdfs/spout/TestHdfsSpout.java | 50 ++++++++++++++++++++ 4 files changed, 85 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/e50b639a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index 5a6adf8..fdb48b4 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -130,10 +130,11 @@ public class HdfsSpout extends BaseRichSpout { // 3) Select a new file if one is not open already if (reader == null) { reader = pickNextFile(); - fileReadCompletely=false; if (reader == null) { LOG.debug("Currently no new files to process under : " + sourceDirPath); return; + } else { + fileReadCompletely=false; } } if( fileReadCompletely ) { // wait for more ACKs before proceeding http://git-wip-us.apache.org/repos/asf/storm/blob/e50b639a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java index 5edb4e5..308d1c6 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java @@ -76,9 +76,16 @@ public class SequenceFileReader<Key extends Writable,Value extends Writable> int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); this.offset = new SequenceFileReader.Offset(offset); this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) ); - this.reader.sync(this.offset.lastSyncPoint); this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf() ); this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf() ); + skipToOffset(this.reader, this.offset, this.key); + } + + private static <K> void skipToOffset(SequenceFile.Reader reader, Offset offset, K key) throws IOException { + reader.sync(offset.lastSyncPoint); + for(int i=0; i<offset.recordsSinceLastSync; ++i) { + reader.next(key); + } } public String getKeyName() { @@ -129,9 +136,9 @@ public class SequenceFileReader<Key extends Writable,Value extends Writable> public static class Offset implements FileOffset { - private long lastSyncPoint; - private long recordsSinceLastSync; - private long currentRecord; + public long lastSyncPoint; + public long recordsSinceLastSync; + public long currentRecord; private long currRecordEndOffset; private long prevRecordEndOffset; @@ -152,12 +159,20 @@ public class SequenceFileReader<Key extends Writable,Value extends Writable> try { if(offset==null) throw new IllegalArgumentException("offset cannot be null"); - String[] parts = offset.split(","); - this.lastSyncPoint = Long.parseLong(parts[0].split("=")[1]); - this.recordsSinceLastSync = Long.parseLong(parts[1].split("=")[1]); - this.currentRecord = Long.parseLong(parts[2].split("=")[1]); - this.prevRecordEndOffset = 0; - this.currRecordEndOffset = 0; + if(offset.equalsIgnoreCase("0")) { + this.lastSyncPoint = 0; + this.recordsSinceLastSync = 0; + this.currentRecord = 0; + this.prevRecordEndOffset = 0; + this.currRecordEndOffset = 0; + } else { + String[] parts = offset.split(":"); + this.lastSyncPoint = Long.parseLong(parts[0].split("=")[1]); + this.recordsSinceLastSync = Long.parseLong(parts[1].split("=")[1]); + this.currentRecord = Long.parseLong(parts[2].split("=")[1]); + this.prevRecordEndOffset = 0; + this.currRecordEndOffset = 0; + } } catch (Exception e) { throw new IllegalArgumentException("'" + offset + "' cannot be interpreted. It is not in expected format for SequenceFileReader." + http://git-wip-us.apache.org/repos/asf/storm/blob/e50b639a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java index cf04710..fdea42a 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java @@ -119,9 +119,14 @@ class TextFileReader extends AbstractFileReader { if(offset==null) throw new IllegalArgumentException("offset cannot be null"); try { - String[] parts = offset.split(":"); - this.charOffset = Long.parseLong(parts[0].split("=")[1]); - this.lineNumber = Long.parseLong(parts[1].split("=")[1]); + if(offset.equalsIgnoreCase("0")) { + this.charOffset = 0; + this.lineNumber = 0; + } else { + String[] parts = offset.split(":"); + this.charOffset = Long.parseLong(parts[0].split("=")[1]); + this.lineNumber = Long.parseLong(parts[1].split("=")[1]); + } } catch (Exception e) { throw new IllegalArgumentException("'" + offset + "' cannot be interpreted. It is not in expected format for TextFileReader." + http://git-wip-us.apache.org/repos/asf/storm/blob/e50b639a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java index 1279f06..203a63b 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java @@ -198,6 +198,56 @@ public class TestHdfsSpout { // check lock file is gone Assert.assertFalse(fs.exists(lock.getLockFile())); + FileReader rdr = getField(spout2, "reader"); + Assert.assertNull(rdr); + Assert.assertTrue(getBoolField(spout2, "fileReadCompletely")); + + } + + @Test + public void testResumeAbandoned_Seq_NoAck() throws Exception { + Path file1 = new Path(source.toString() + "/file1.seq"); + createSeqFile(fs, file1, 6); + + final Integer lockExpirySec = 1; + Map conf = getDefaultConfig(); + conf.put(Configs.COMMIT_FREQ_COUNT, "1"); + conf.put(Configs.COMMIT_FREQ_SEC, "1000"); // basically disable it + conf.put(Configs.LOCK_TIMEOUT, lockExpirySec.toString()); + HdfsSpout spout = makeSpout(0, conf, Configs.SEQ); + HdfsSpout spout2 = makeSpout(1, conf, Configs.SEQ); + + // consume file 1 partially + List<String> res = runSpout(spout, "r2"); + Assert.assertEquals(2, res.size()); + // abandon file + FileLock lock = getField(spout, "lock"); + TestFileLock.closeUnderlyingLockFile(lock); + Thread.sleep(lockExpirySec * 2 * 1000); + + // check lock file presence + Assert.assertTrue(fs.exists(lock.getLockFile())); + + // create another spout to take over processing and read a few lines + List<String> res2 = runSpout(spout2, "r3"); + Assert.assertEquals(3, res2.size()); + + // check lock file presence + Assert.assertTrue(fs.exists(lock.getLockFile())); + + // check lock file contents + List<String> contents = getTextFileContents(fs, lock.getLockFile()); + System.err.println(contents); + + // finish up reading the file + res2 = runSpout(spout2, "r3"); + Assert.assertEquals(4, res2.size()); + + // check lock file is gone + Assert.assertFalse(fs.exists(lock.getLockFile())); + FileReader rdr = getField(spout2, "reader"); + Assert.assertNull( rdr ); + Assert.assertTrue(getBoolField(spout2, "fileReadCompletely")); } private void checkCollectorOutput_txt(MockCollector collector, Path... txtFiles) throws IOException {
