fixing TextReader resume abandoned file functionality. Added UT
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/721c9b3d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/721c9b3d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/721c9b3d Branch: refs/heads/1.x-branch Commit: 721c9b3d1a7a47cda19ca4867a584626c84823f4 Parents: 1e52f08 Author: Roshan Naik <[email protected]> Authored: Tue Dec 22 19:24:42 2015 -0800 Committer: Roshan Naik <[email protected]> Committed: Thu Jan 14 11:34:56 2016 -0800 ---------------------------------------------------------------------- .../org/apache/storm/hdfs/common/HdfsUtils.java | 1 + .../org/apache/storm/hdfs/spout/DirLock.java | 9 +++- .../org/apache/storm/hdfs/spout/FileLock.java | 34 +++++++----- .../org/apache/storm/hdfs/spout/HdfsSpout.java | 40 ++++++++------ .../storm/hdfs/spout/SequenceFileReader.java | 4 +- .../apache/storm/hdfs/spout/TextFileReader.java | 57 ++++++++++++-------- .../apache/storm/hdfs/spout/TestFileLock.java | 2 +- .../apache/storm/hdfs/spout/TestHdfsSpout.java | 51 +++++++++++++++++- .../storm/hdfs/spout/TestProgressTracker.java | 2 +- 9 files changed, 144 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java index 86b9ee8..5ec5333 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.ipc.RemoteException; http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java index 06ca749..0e1182f 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java @@ -21,6 +21,7 @@ package org.apache.storm.hdfs.spout; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.storm.hdfs.common.HdfsUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,8 +102,14 @@ public class DirLock { } } - private static DirLock takeOwnership(FileSystem fs, Path dirLockFile) throws IOException { + if(fs instanceof DistributedFileSystem) { + if (!((DistributedFileSystem) fs).recoverLease(dirLockFile)) { + log.warn("Unable to recover lease on dir lock file " + dirLockFile + " right now. Cannot transfer ownership. Will need to try later."); + return null; + } + } + // delete and recreate lock file if( fs.delete(dirLockFile, false) ) { // returns false if somebody else already deleted it (to take ownership) FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, dirLockFile); http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java index 89ed855..c64336d 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.ipc.RemoteException; import org.apache.storm.hdfs.common.HdfsUtils; @@ -65,7 +66,7 @@ public class FileLock { this.lockFile = lockFile; this.lockFileStream = fs.append(lockFile); this.componentID = spoutId; - log.debug("Acquired abandoned lockFile {}", lockFile); + log.debug("Acquired abandoned lockFile {}, Spout {}", lockFile, spoutId); logProgress(entry.fileOffset, true); } @@ -95,13 +96,13 @@ public class FileLock { public void release() throws IOException { lockFileStream.close(); if(!fs.delete(lockFile, false)){ - log.warn("Unable to delete lock file"); + log.warn("Unable to delete lock file, Spout = {}", componentID); throw new IOException("Unable to delete lock file"); } - log.debug("Released lock file {}", lockFile); + log.debug("Released lock file {}. Spout {}", lockFile, componentID); } - // for testing only.. invoked via reflection + // For testing only.. invoked via reflection private void forceCloseLockFile() throws IOException { lockFileStream.close(); } @@ -115,14 +116,14 @@ public class FileLock { try { FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile); if (ostream != null) { - log.debug("Acquired lock on file {}. LockFile=", fileToLock, lockFile); + log.debug("Acquired lock on file {}. LockFile= {}, Spout = {}", fileToLock, lockFile, spoutId); return new FileLock(fs, lockFile, ostream, spoutId); } else { - log.debug("Cannot lock file {} as its already locked.", fileToLock); + log.debug("Cannot lock file {} as its already locked. Spout = {}", fileToLock, spoutId); return null; } } catch (IOException e) { - log.error("Error when acquiring lock on file " + fileToLock, e); + log.error("Error when acquiring lock on file " + fileToLock + " Spout = " + spoutId, e); throw e; } } @@ -147,7 +148,6 @@ public class FileLock { if(lastEntry==null) { throw new RuntimeException(lockFile.getName() + " is empty. this file is invalid."); } - log.error("{} , lastModified= {}, expiryTime= {}, diff= {}", lockFile, lastEntry.eventTime, olderThan, lastEntry.eventTime-olderThan ); if( lastEntry.eventTime <= olderThan ) return lastEntry; } @@ -181,17 +181,25 @@ public class FileLock { * file is stale. so this value comes from that earlier scan. * @param spoutId spout id * @throws IOException if unable to acquire - * @return null if lock File is being used by another thread + * @return null if lock File is not recoverable */ public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry lastEntry, String spoutId) throws IOException { try { + if(fs instanceof DistributedFileSystem ) { + if( !((DistributedFileSystem) fs).recoverLease(lockFile) ) { + log.warn("Unable to recover lease on lock file {} right now. Cannot transfer ownership. Will need to try later. Spout = {}" , lockFile , spoutId); + return null; + } + } return new FileLock(fs, lockFile, spoutId, lastEntry); - } catch (RemoteException e) { - if (e.unwrapRemoteException() instanceof AlreadyBeingCreatedException) { - log.warn("Lock file {} is currently open. Cannot transfer ownership now. Will try later.", lockFile); + } catch (IOException e) { + if (e instanceof RemoteException && + ((RemoteException) e).unwrapRemoteException() instanceof AlreadyBeingCreatedException) { + log.warn("Lock file " + lockFile + "is currently open. Cannot transfer ownership now. Will need to try later. Spout= " + spoutId, e); return null; } else { // unexpected error + log.warn("Cannot transfer ownership now for lock file " + lockFile + ". Will need to try later. Spout =" + spoutId, e); throw e; } } @@ -226,7 +234,7 @@ public class FileLock { } } if(listing.isEmpty()) - log.info("No abandoned lock files found"); + log.info("No abandoned lock files found by Spout {}", spoutId); return null; } http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index 3d95ea7..5a6adf8 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -107,7 +107,7 @@ public class HdfsSpout extends BaseRichSpout { } public void nextTuple() { - LOG.debug("Next Tuple"); + LOG.debug("Next Tuple {}", spoutId); // 1) First re-emit any previously failed tuples (from retryList) if (!retryList.isEmpty()) { LOG.debug("Sending from retry list"); @@ -118,8 +118,8 @@ public class HdfsSpout extends BaseRichSpout { if( ackEnabled && tracker.size()>=maxDuplicates ) { LOG.warn("Waiting for more ACKs before generating new tuples. " + - "Progress tracker size has reached limit {}" - , maxDuplicates); + "Progress tracker size has reached limit {}, SpoutID {}" + , maxDuplicates, spoutId); // Don't emit anything .. allow configured spout wait strategy to kick in return; } @@ -172,8 +172,7 @@ public class HdfsSpout extends BaseRichSpout { // spout wait strategy (due to no emits). Instead we go back into the loop and // generate a tuple from next file } - } - + } // while } // will commit progress into lock file if commit threshold is reached @@ -187,7 +186,7 @@ public class HdfsSpout extends BaseRichSpout { commitTimeElapsed.set(false); setupCommitElapseTimer(); } catch (IOException e) { - LOG.error("Unable to commit progress Will retry later.", e); + LOG.error("Unable to commit progress Will retry later. Spout ID = " + spoutId, e); } } } @@ -212,9 +211,9 @@ public class HdfsSpout extends BaseRichSpout { private void markFileAsDone(Path filePath) { try { Path newFile = renameCompletedFile(reader.getFilePath()); - LOG.info("Completed processing {}", newFile); + LOG.info("Completed processing {}. Spout Id = {} ", newFile, spoutId); } catch (IOException e) { - LOG.error("Unable to archive completed file" + filePath, e); + LOG.error("Unable to archive completed file" + filePath + " Spout ID " + spoutId, e); } closeReaderAndResetTrackers(); } @@ -225,13 +224,13 @@ public class HdfsSpout extends BaseRichSpout { String originalName = new Path(fileNameMinusSuffix).getName(); Path newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName); - LOG.info("Moving bad file {} to {}. Processed it till offset {}", originalName, newFile, tracker.getCommitPosition()); + LOG.info("Moving bad file {} to {}. Processed it till offset {}. SpoutID= {}", originalName, newFile, tracker.getCommitPosition(), spoutId); try { if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning false or throwing exception throw new IOException("Move failed for bad file: " + file); // convert false ret value to exception } } catch (IOException e) { - LOG.warn("Error moving bad file: " + file + " to destination " + newFile, e); + LOG.warn("Error moving bad file: " + file + " to destination " + newFile + " SpoutId =" + spoutId, e); } closeReaderAndResetTrackers(); } @@ -245,8 +244,9 @@ public class HdfsSpout extends BaseRichSpout { reader = null; try { lock.release(); + LOG.debug("Spout {} released FileLock. SpoutId = {}", lock.getLockFile(), spoutId); } catch (IOException e) { - LOG.error("Unable to delete lock file : " + this.lock.getLockFile(), e); + LOG.error("Unable to delete lock file : " + this.lock.getLockFile() + " SpoutId =" + spoutId, e); } lock = null; } @@ -260,7 +260,7 @@ public class HdfsSpout extends BaseRichSpout { public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; final String FILE_SYSTEM = "filesystem"; - LOG.info("Opening HDFS Spout"); + LOG.info("Opening HDFS Spout {}", spoutId); this.collector = collector; this.hdfsConfig = new Configuration(); this.tupleCounter = 0; @@ -437,6 +437,7 @@ public class HdfsSpout extends BaseRichSpout { // 1) If there are any abandoned files, pick oldest one lock = getOldestExpiredLock(); if (lock != null) { + LOG.debug("Spout {} now took over ownership of abandoned FileLock {}" , spoutId, lock.getLockFile()); Path file = getFileForLockFile(lock.getLockFile(), sourceDirPath); String resumeFromOffset = lock.getLastLogEntry().fileOffset; LOG.info("Resuming processing of abandoned file : {}", file); @@ -454,7 +455,7 @@ public class HdfsSpout extends BaseRichSpout { lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId); if( lock==null ) { - LOG.debug("Unable to get lock, so skipping file: {}", file); + LOG.debug("Unable to get FileLock, so skipping file: {}", file); continue; // could not lock, so try another file. } LOG.info("Processing : {} ", file); @@ -480,9 +481,15 @@ public class HdfsSpout extends BaseRichSpout { DirLock dirlock = DirLock.tryLock(hdfs, lockDirPath); if (dirlock == null) { dirlock = DirLock.takeOwnershipIfStale(hdfs, lockDirPath, lockTimeoutSec); - if (dirlock == null) + if (dirlock == null) { + LOG.debug("Spout {} could not take over ownership of DirLock for {}" , spoutId, lockDirPath); return null; + } + LOG.debug("Spout {} now took over ownership of abandoned DirLock for {}" , spoutId, lockDirPath); + } else { + LOG.debug("Spout {} now owns DirLock for {}", spoutId, lockDirPath); } + try { // 2 - if clocks are in sync then simply take ownership of the oldest expired lock if (clocksInSync) @@ -512,6 +519,7 @@ public class HdfsSpout extends BaseRichSpout { } } finally { dirlock.release(); + LOG.debug("Released DirLock {}, SpoutID {} ", dirlock.getLockFile(), spoutId); } } @@ -583,10 +591,10 @@ public class HdfsSpout extends BaseRichSpout { private Path getFileForLockFile(Path lockFile, Path sourceDirPath) throws IOException { String lockFileName = lockFile.getName(); - Path dataFile = new Path(sourceDirPath + lockFileName + inprogress_suffix); + Path dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName + inprogress_suffix); if( hdfs.exists(dataFile) ) return dataFile; - dataFile = new Path(sourceDirPath + lockFileName); + dataFile = new Path(sourceDirPath + Path.SEPARATOR + lockFileName); if(hdfs.exists(dataFile)) return dataFile; return null; http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java index 5ff7b75..5edb4e5 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java @@ -150,6 +150,8 @@ public class SequenceFileReader<Key extends Writable,Value extends Writable> public Offset(String offset) { try { + if(offset==null) + throw new IllegalArgumentException("offset cannot be null"); String[] parts = offset.split(","); this.lastSyncPoint = Long.parseLong(parts[0].split("=")[1]); this.recordsSinceLastSync = Long.parseLong(parts[1].split("=")[1]); @@ -169,7 +171,7 @@ public class SequenceFileReader<Key extends Writable,Value extends Writable> "sync=" + lastSyncPoint + ":afterSync=" + recordsSinceLastSync + ":record=" + currentRecord + - '}'; + ":}"; } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java index b998d30..cf04710 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java @@ -53,16 +53,18 @@ class TextFileReader extends AbstractFileReader { this(fs, file, conf, new TextFileReader.Offset(startOffset) ); } - private TextFileReader(FileSystem fs, Path file, Map conf, TextFileReader.Offset startOffset) throws IOException { + private TextFileReader(FileSystem fs, Path file, Map conf, TextFileReader.Offset startOffset) + throws IOException { super(fs, file, new Fields(DEFAULT_FIELD_NAME)); offset = startOffset; FSDataInputStream in = fs.open(file); - if(offset.byteOffset>0) - in.seek(offset.byteOffset); String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString(); int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() ); reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz); + if(offset.charOffset >0) + reader.skip(offset.charOffset); + } public Offset getFileOffset() { @@ -70,15 +72,31 @@ class TextFileReader extends AbstractFileReader { } public List<Object> next() throws IOException, ParseException { - String line = reader.readLine(); + String line = readLineAndTrackOffset(reader); if(line!=null) { - int strByteSize = line.getBytes().length; - offset.increment(strByteSize); return Collections.singletonList((Object) line); } return null; } + private String readLineAndTrackOffset(BufferedReader reader) throws IOException { + StringBuffer sb = new StringBuffer(1000); + long before = offset.charOffset; + int ch; + while( (ch = reader.read()) != -1 ) { + ++offset.charOffset; + if (ch == '\n') { + ++offset.lineNumber; + return sb.toString(); + } else if( ch != '\r') { + sb.append((char)ch); + } + } + if(before==offset.charOffset) // reached EOF, didnt read anything + return null; + return sb.toString(); + } + @Override public void close() { try { @@ -89,41 +107,41 @@ class TextFileReader extends AbstractFileReader { } public static class Offset implements FileOffset { - long byteOffset; + long charOffset; long lineNumber; public Offset(long byteOffset, long lineNumber) { - this.byteOffset = byteOffset; + this.charOffset = byteOffset; this.lineNumber = lineNumber; } public Offset(String offset) { - if(offset!=null) + if(offset==null) throw new IllegalArgumentException("offset cannot be null"); try { String[] parts = offset.split(":"); - this.byteOffset = Long.parseLong(parts[0].split("=")[1]); + this.charOffset = Long.parseLong(parts[0].split("=")[1]); this.lineNumber = Long.parseLong(parts[1].split("=")[1]); } catch (Exception e) { throw new IllegalArgumentException("'" + offset + "' cannot be interpreted. It is not in expected format for TextFileReader." + - " Format e.g. {byte=123:line=5}"); + " Format e.g. {char=123:line=5}"); } } @Override public String toString() { return '{' + - "byte=" + byteOffset + + "char=" + charOffset + ":line=" + lineNumber + - '}'; + ":}"; } @Override public boolean isNextOffset(FileOffset rhs) { if(rhs instanceof Offset) { Offset other = ((Offset) rhs); - return other.byteOffset > byteOffset && + return other.charOffset > charOffset && other.lineNumber == lineNumber+1; } return false; @@ -146,26 +164,21 @@ class TextFileReader extends AbstractFileReader { Offset that = (Offset) o; - if (byteOffset != that.byteOffset) + if (charOffset != that.charOffset) return false; return lineNumber == that.lineNumber; } @Override public int hashCode() { - int result = (int) (byteOffset ^ (byteOffset >>> 32)); + int result = (int) (charOffset ^ (charOffset >>> 32)); result = 31 * result + (int) (lineNumber ^ (lineNumber >>> 32)); return result; } - void increment(int delta) { - ++lineNumber; - byteOffset += delta; - } - @Override public Offset clone() { - return new Offset(byteOffset, lineNumber); + return new Offset(charOffset, lineNumber); } } //class Offset } http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java index a97b3f2..7995248 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java @@ -314,7 +314,7 @@ public class TestFileLock { } } - private void closeUnderlyingLockFile(FileLock lock) throws ReflectiveOperationException { + public static void closeUnderlyingLockFile(FileLock lock) throws ReflectiveOperationException { Method m = FileLock.class.getDeclaredMethod("forceCloseLockFile"); m.setAccessible(true); m.invoke(lock); http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java index f64400a..1279f06 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java @@ -21,6 +21,7 @@ package org.apache.storm.hdfs.spout; import backtype.storm.Config; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; @@ -74,7 +75,7 @@ public class TestHdfsSpout { static MiniDFSCluster.Builder builder; static MiniDFSCluster hdfsCluster; - static FileSystem fs; + static DistributedFileSystem fs; static String hdfsURI; static Configuration conf = new Configuration(); @@ -156,6 +157,49 @@ public class TestHdfsSpout { checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2); } + @Test + public void testResumeAbandoned_Text_NoAck() throws Exception { + Path file1 = new Path(source.toString() + "/file1.txt"); + createTextFile(file1, 6); + + final Integer lockExpirySec = 1; + Map conf = getDefaultConfig(); + conf.put(Configs.COMMIT_FREQ_COUNT, "1"); + conf.put(Configs.COMMIT_FREQ_SEC, "1000"); // basically disable it + conf.put(Configs.LOCK_TIMEOUT, lockExpirySec.toString()); + HdfsSpout spout = makeSpout(0, conf, Configs.TEXT); + HdfsSpout spout2 = makeSpout(1, conf, Configs.TEXT); + + // consume file 1 partially + List<String> res = runSpout(spout, "r2"); + Assert.assertEquals(2, res.size()); + // abandon file + FileLock lock = getField(spout, "lock"); + TestFileLock.closeUnderlyingLockFile(lock); + Thread.sleep(lockExpirySec * 2 * 1000); + + // check lock file presence + Assert.assertTrue(fs.exists(lock.getLockFile())); + + // create another spout to take over processing and read a few lines + List<String> res2 = runSpout(spout2, "r3"); + Assert.assertEquals(3, res2.size()); + + // check lock file presence + Assert.assertTrue(fs.exists(lock.getLockFile())); + + // check lock file contents + List<String> contents = readTextFile(fs, lock.getLockFile().toString()); + System.err.println(contents); + + // finish up reading the file + res2 = runSpout(spout2, "r2"); + Assert.assertEquals(4, res2.size()); + + // check lock file is gone + Assert.assertFalse(fs.exists(lock.getLockFile())); + } + private void checkCollectorOutput_txt(MockCollector collector, Path... txtFiles) throws IOException { ArrayList<String> expected = new ArrayList<>(); for (Path txtFile : txtFiles) { @@ -183,6 +227,7 @@ public class TestHdfsSpout { return result; } + private void checkCollectorOutput_seq(MockCollector collector, Path... seqFiles) throws IOException { ArrayList<String> expected = new ArrayList<>(); for (Path seqFile : seqFiles) { @@ -515,8 +560,12 @@ public class TestHdfsSpout { private void createTextFile(Path file, int lineCount) throws IOException { FSDataOutputStream os = fs.create(file); + int size = 0; for (int i = 0; i < lineCount; i++) { os.writeBytes("line " + i + System.lineSeparator()); + String msg = "line " + i + System.lineSeparator(); + System.err.print(size + "-" + msg); + size += msg.getBytes().length; } os.close(); } http://git-wip-us.apache.org/repos/asf/storm/blob/721c9b3d/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java index 59aad25..0bb44af 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java @@ -60,7 +60,7 @@ public class TestProgressTracker { TextFileReader.Offset currOffset = reader.getFileOffset(); Assert.assertNotNull(currOffset); - Assert.assertEquals(0, currOffset.byteOffset); + Assert.assertEquals(0, currOffset.charOffset); // read 1st line and ack Assert.assertNotNull(reader.next());
