More tests and tests for FileLock. fixing UT TestHdfsSpout.testSimpleSequenceFile
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/de37de68 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/de37de68 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/de37de68 Branch: refs/heads/1.x-branch Commit: de37de68f3a97c6e0d4d6aa38f972fd8d1ecb032 Parents: dcc930b Author: Roshan Naik <[email protected]> Authored: Mon Dec 14 16:19:54 2015 -0800 Committer: Roshan Naik <[email protected]> Committed: Thu Jan 14 11:34:56 2016 -0800 ---------------------------------------------------------------------- .../org/apache/storm/hdfs/common/HdfsUtils.java | 1 - .../org/apache/storm/hdfs/spout/FileLock.java | 47 +++- .../org/apache/storm/hdfs/spout/HdfsSpout.java | 2 +- .../apache/storm/hdfs/spout/TestDirLock.java | 13 +- .../apache/storm/hdfs/spout/TestFileLock.java | 273 ++++++++++++++++++- .../apache/storm/hdfs/spout/TestHdfsSpout.java | 12 +- 6 files changed, 314 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java index e8c32aa..0574c6a 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.ipc.RemoteException; -import org.apache.storm.hdfs.spout.DirLock; import java.io.IOException; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java index 1974e44..76a459d 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java @@ -23,12 +23,13 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.ipc.RemoteException; import org.apache.storm.hdfs.common.HdfsUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; -import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.Collection; @@ -44,12 +45,12 @@ public class FileLock { private final FileSystem fs; private final String componentID; private final Path lockFile; - private final DataOutputStream lockFileStream; + private final FSDataOutputStream lockFileStream; private LogEntry lastEntry; private static final Logger log = LoggerFactory.getLogger(DirLock.class); - private FileLock(FileSystem fs, Path lockFile, DataOutputStream lockFileStream, String spoutId) + private FileLock(FileSystem fs, Path lockFile, FSDataOutputStream lockFileStream, String spoutId) throws IOException { this.fs = fs; this.lockFile = lockFile; @@ -83,7 +84,8 @@ public class FileLock { lockFileStream.writeBytes(System.lineSeparator() + line); else lockFileStream.writeBytes(line); - lockFileStream.flush(); + lockFileStream.hflush(); + lastEntry = entry; // update this only after writing to hdfs } @@ -125,7 +127,8 @@ public class FileLock { */ public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, long olderThan) throws IOException { - if( fs.getFileStatus(lockFile).getModificationTime() >= olderThan ) { + long modifiedTime = fs.getFileStatus(lockFile).getModificationTime(); + if( modifiedTime <= olderThan ) { // look //Impt: HDFS timestamp may not reflect recent appends, so we double check the // timestamp in last line of file to see when the last update was made LogEntry lastEntry = getLastEntry(fs, lockFile); @@ -158,18 +161,28 @@ public class FileLock { // takes ownership of the lock file /** - * Takes ownership of the lock file. + * Takes ownership of the lock file if possible. * @param lockFile * @param lastEntry last entry in the lock file. this param is an optimization. * we dont scan the lock file again to find its last entry here since * its already been done once by the logic used to check if the lock * file is stale. so this value comes from that earlier scan. * @param spoutId spout id - * @return + * @throws IOException if unable to acquire + * @return null if lock File is being used by another thread */ public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry lastEntry, String spoutId) throws IOException { - return new FileLock(fs, lockFile, spoutId, lastEntry); + try { + return new FileLock(fs, lockFile, spoutId, lastEntry); + } catch (RemoteException e) { + if (e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) { + log.info("Lock file {} is currently open. cannot transfer ownership on.", lockFile); + return null; + } else { // unexpected error + throw e; + } + } } /** @@ -188,15 +201,19 @@ public class FileLock { long olderThan = System.currentTimeMillis() - (locktimeoutSec*1000); Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan); - // locate oldest expired lock file (if any) and take ownership + // locate expired lock files (if any). Try to take ownership (oldest lock first) for (Path file : listing) { if(file.getName().equalsIgnoreCase( DirLock.DIR_LOCK_FILE) ) continue; LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan); - if(lastEntry!=null) - return FileLock.takeOwnership(fs, file, lastEntry, spoutId); + if(lastEntry!=null) { + FileLock lock = FileLock.takeOwnership(fs, file, lastEntry, spoutId); + if(lock!=null) + return lock; + } } - log.info("No abandoned files found"); + if(listing.isEmpty()) + log.info("No abandoned files to be refound"); return null; } @@ -209,14 +226,14 @@ public class FileLock { * @param fs * @param lockFilesDir * @param locktimeoutSec - * @param spoutId * @return a Pair<lock file path, last entry in lock file> .. if expired lock file found * @throws IOException */ - public static HdfsUtils.Pair<Path,LogEntry> locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId) + public static HdfsUtils.Pair<Path,LogEntry> locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec) throws IOException { // list files - long olderThan = System.currentTimeMillis() - (locktimeoutSec*1000); + long now = System.currentTimeMillis(); + long olderThan = now - (locktimeoutSec*1000); Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan); // locate oldest expired lock file (if any) and take ownership http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index d8aa3f4..7977b96 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -504,7 +504,7 @@ public class HdfsSpout extends BaseRichSpout { // 3 - if clocks are not in sync .. if( lastExpiredLock == null ) { // just make a note of the oldest expired lock now and check if its still unmodified after lockTimeoutSec - lastExpiredLock = FileLock.locateOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec, spoutId); + lastExpiredLock = FileLock.locateOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec); lastExpiredLockTime = System.currentTimeMillis(); return null; } http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java index bdb0cdf..667248e 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java @@ -127,18 +127,21 @@ public class TestDirLock { class DirLockingThread extends Thread { + private int thdNum; private final FileSystem fs; private final Path dir; public boolean cleanExit = false; - public DirLockingThread(int thdNum,FileSystem fs, Path dir) throws IOException { + public DirLockingThread(int thdNum,FileSystem fs, Path dir) + throws IOException { + this.thdNum = thdNum; this.fs = fs; this.dir = dir; - Thread.currentThread().setName("DirLockingThread-" + thdNum); } @Override public void run() { + Thread.currentThread().setName("DirLockingThread-" + thdNum); DirLock lock = null; try { do { @@ -146,7 +149,7 @@ public class TestDirLock { lock = DirLock.tryLock(fs, dir); System.err.println("Acquired lock " + getName()); if(lock==null) { - System.out.println("Retrying lock - " + Thread.currentThread().getId()); + System.out.println("Retrying lock - " + getName()); } } while (lock==null); cleanExit= true; @@ -164,7 +167,7 @@ public class TestDirLock { } } System.err.println("Thread exiting " + getName()); - } + } // run() - } + } // class DirLockingThread } http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java index 8031041..1f22a5b 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java @@ -20,10 +20,12 @@ package org.apache.storm.hdfs.spout; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.storm.hdfs.common.HdfsUtils; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -31,7 +33,11 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import java.io.BufferedReader; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; public class TestFileLock { @@ -41,8 +47,8 @@ public class TestFileLock { static String hdfsURI; static HdfsConfiguration conf = new HdfsConfiguration(); - private Path filesDir = new Path("/tmp/lockdir"); - private Path locksDir = new Path("/tmp/lockdir"); + private Path filesDir = new Path("/tmp/filesdir"); + private Path locksDir = new Path("/tmp/locskdir"); @BeforeClass public static void setupClass() throws IOException { @@ -70,7 +76,7 @@ public class TestFileLock { } @Test - public void testBasic() throws Exception { + public void testBasicLocking() throws Exception { // create empty files in filesDir Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); Path file2 = new Path(filesDir + Path.SEPARATOR + "file2"); @@ -82,7 +88,7 @@ public class TestFileLock { Assert.assertNotNull(lock1a); Assert.assertTrue(fs.exists(lock1a.getLockFile())); Assert.assertEquals(lock1a.getLockFile().getParent(), locksDir); // verify lock file location - Assert.assertEquals(lock1a.getLockFile().getName(), file1.getName()); // very lock filename + Assert.assertEquals(lock1a.getLockFile().getName(), file1.getName()); // verify lock filename // acquire another lock on file1 and verify it failed FileLock lock1b = FileLock.tryLock(fs, file1, locksDir, "spout1"); @@ -97,14 +103,14 @@ public class TestFileLock { Assert.assertNotNull(lock1c); Assert.assertTrue(fs.exists(lock1c.getLockFile())); Assert.assertEquals(lock1c.getLockFile().getParent(), locksDir); // verify lock file location - Assert.assertEquals(lock1c.getLockFile().getName(), file1.getName()); // very lock filename + Assert.assertEquals(lock1c.getLockFile().getName(), file1.getName()); // verify lock filename // try locking another file2 at the same time FileLock lock2a = FileLock.tryLock(fs, file2, locksDir, "spout1"); Assert.assertNotNull(lock2a); Assert.assertTrue(fs.exists(lock2a.getLockFile())); Assert.assertEquals(lock2a.getLockFile().getParent(), locksDir); // verify lock file location - Assert.assertEquals(lock2a.getLockFile().getName(), file1.getName()); // very lock filename + Assert.assertEquals(lock2a.getLockFile().getName(), file2.getName()); // verify lock filename // release both locks lock2a.release(); @@ -113,5 +119,260 @@ public class TestFileLock { Assert.assertFalse(fs.exists(lock1c.getLockFile())); } + @Test + public void testHeartbeat() throws Exception { + Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); + fs.create(file1).close(); + + // acquire lock on file1 + FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1"); + Assert.assertNotNull(lock1); + Assert.assertTrue(fs.exists(lock1.getLockFile())); + + ArrayList<String> lines = readTextFile(lock1.getLockFile()); + Assert.assertEquals("heartbeats appear to be missing", 1, lines.size()); + + // hearbeat upon it + lock1.heartbeat("1"); + lock1.heartbeat("2"); + lock1.heartbeat("3"); + + lines = readTextFile(lock1.getLockFile()); + Assert.assertEquals("heartbeats appear to be missing", 4, lines.size()); + + lock1.heartbeat("4"); + lock1.heartbeat("5"); + lock1.heartbeat("6"); + + lines = readTextFile(lock1.getLockFile()); + Assert.assertEquals("heartbeats appear to be missing", 7, lines.size()); + + lock1.release(); + lines = readTextFile(lock1.getLockFile()); + Assert.assertNull(lines); + Assert.assertFalse(fs.exists(lock1.getLockFile())); + } + + @Test + public void testConcurrentLocking() throws IOException, InterruptedException { + Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); + fs.create(file1).close(); + + FileLockingThread[] thds = startThreads(100, file1, locksDir); + for (FileLockingThread thd : thds) { + thd.join(); + if( !thd.cleanExit) + System.err.println(thd.getName() + " did not exit cleanly"); + Assert.assertTrue(thd.cleanExit); + } + + Path lockFile = new Path(locksDir + Path.SEPARATOR + file1.getName()); + Assert.assertFalse(fs.exists(lockFile)); + } + + private FileLockingThread[] startThreads(int thdCount, Path fileToLock, Path locksDir) + throws IOException { + FileLockingThread[] result = new FileLockingThread[thdCount]; + for (int i = 0; i < thdCount; i++) { + result[i] = new FileLockingThread(i, fs, fileToLock, locksDir, "spout" + Integer.toString(i)); + } + + for (FileLockingThread thd : result) { + thd.start(); + } + return result; + } + + + @Test + public void testStaleLockDetection_SingleLock() throws Exception { + final int LOCK_EXPIRY_SEC = 1; + final int WAIT_MSEC = 1500; + Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); + fs.create(file1).close(); + FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1"); + try { + // acquire lock on file1 + Assert.assertNotNull(lock1); + Assert.assertTrue(fs.exists(lock1.getLockFile())); + Thread.sleep(WAIT_MSEC); // wait for lock to expire + HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); + Assert.assertNotNull(expired); + + // heartbeat, ensure its no longer stale and read back the heartbeat data + lock1.heartbeat("1"); + expired = FileLock.locateOldestExpiredLock(fs, locksDir, 1); + Assert.assertNull(expired); + + FileLock.LogEntry lastEntry = lock1.getLastLogEntry(); + Assert.assertNotNull(lastEntry); + Assert.assertEquals("1", lastEntry.fileOffset); + + // wait and check for expiry again + Thread.sleep(WAIT_MSEC); + expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); + Assert.assertNotNull(expired); + } finally { + lock1.release(); + fs.delete(file1, false); + } + } + + @Test + public void testStaleLockDetection_MultipleLocks() throws Exception { + final int LOCK_EXPIRY_SEC = 1; + final int WAIT_MSEC = 1500; + Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); + Path file2 = new Path(filesDir + Path.SEPARATOR + "file2"); + Path file3 = new Path(filesDir + Path.SEPARATOR + "file3"); + + fs.create(file1).close(); + fs.create(file2).close(); + fs.create(file3).close(); + + // 1) acquire locks on file1,file2,file3 + FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1"); + FileLock lock2 = FileLock.tryLock(fs, file2, locksDir, "spout2"); + FileLock lock3 = FileLock.tryLock(fs, file3, locksDir, "spout3"); + Assert.assertNotNull(lock1); + Assert.assertNotNull(lock2); + Assert.assertNotNull(lock3); + + try { + HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); + Assert.assertNull(expired); + + // 2) wait for all 3 locks to expire then heart beat on 2 locks and verify stale lock + Thread.sleep(WAIT_MSEC); + lock1.heartbeat("1"); + lock2.heartbeat("1"); + + expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); + Assert.assertNotNull(expired); + Assert.assertEquals("spout3", expired.getValue().componentID); + } finally { + lock1.release(); + lock2.release(); + lock3.release(); + fs.delete(file1, false); + fs.delete(file2, false); + fs.delete(file3, false); + } + } + + @Test + public void testStaleLockRecovery() throws Exception { + final int LOCK_EXPIRY_SEC = 1; + final int WAIT_MSEC = 1500; + Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); + Path file2 = new Path(filesDir + Path.SEPARATOR + "file2"); + Path file3 = new Path(filesDir + Path.SEPARATOR + "file3"); + + fs.create(file1).close(); + fs.create(file2).close(); + fs.create(file3).close(); + + // 1) acquire locks on file1,file2,file3 + FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1"); + FileLock lock2 = FileLock.tryLock(fs, file2, locksDir, "spout2"); + FileLock lock3 = FileLock.tryLock(fs, file3, locksDir, "spout3"); + Assert.assertNotNull(lock1); + Assert.assertNotNull(lock2); + Assert.assertNotNull(lock3); + + try { + HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); + Assert.assertNull(expired); + + // 2) wait for all 3 locks to expire then heart beat on 2 locks + Thread.sleep(WAIT_MSEC); + lock1.heartbeat("1"); + lock2.heartbeat("1"); + + //todo: configure the HDFS lease timeout + + // 3) Take ownership of stale lock + FileLock lock3b = FileLock.acquireOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC, "spout1"); +// Assert.assertNotNull(lock3b); +// Assert.assertEquals("Expected lock3 file", lock3b.getLockFile(), lock3.getLockFile()); + }finally { + lock1.release(); + lock2.release(); + lock3.release(); + fs.delete(file1, false); + fs.delete(file2, false); + fs.delete(file3, false); + } + } + + /** return null if file not found */ + private ArrayList<String> readTextFile(Path file) throws IOException { + FSDataInputStream os = null; + try { + os = fs.open(file); + if (os == null) + return null; + BufferedReader reader = new BufferedReader(new InputStreamReader(os)); + ArrayList<String> lines = new ArrayList<>(); + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + lines.add(line); + } + return lines; + } catch( FileNotFoundException e) { + return null; + } finally { + if(os!=null) + os.close(); + } + } + + class FileLockingThread extends Thread { + + private int thdNum; + private final FileSystem fs; + public boolean cleanExit = false; + private Path fileToLock; + private Path locksDir; + private String spoutId; + + public FileLockingThread(int thdNum, FileSystem fs, Path fileToLock, Path locksDir, String spoutId) + throws IOException { + this.thdNum = thdNum; + this.fs = fs; + this.fileToLock = fileToLock; + this.locksDir = locksDir; + this.spoutId = spoutId; + } + + @Override + public void run() { + Thread.currentThread().setName("FileLockingThread-" + thdNum); + FileLock lock = null; + try { + do { + System.err.println("Trying lock - " + getName()); + lock = FileLock.tryLock(fs, this.fileToLock, this.locksDir, spoutId); + System.err.println("Acquired lock - " + getName()); + if(lock==null) { + System.out.println("Retrying lock - " + getName()); + } + } while (lock==null); + cleanExit= true; + } catch (Exception e) { + e.printStackTrace(); + } + finally { + try { + if(lock!=null) { + lock.release(); + System.err.println("Released lock - " + getName()); + } + } catch (IOException e) { + e.printStackTrace(System.err); + } + } + System.err.println("Thread exiting - " + getName()); + } // run() + } // class FileLockingThread } http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java index 9200c90..d967572 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java @@ -175,17 +175,17 @@ public class TestHdfsSpout { ArrayList<String> result = new ArrayList<>(); for (Path seqFile : seqFiles) { - FSDataInputStream istream = fs.open(seqFile); + Path file = new Path(fs.getUri().toString() + seqFile.toString()); + SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file)); try { - SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(seqFile)); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); - while (reader.next(key, value) ) { - String keyValStr = Arrays.asList(key,value).toString(); + while (reader.next(key, value)) { + String keyValStr = Arrays.asList(key, value).toString(); result.add(keyValStr); } } finally { - istream.close(); + reader.close(); } }// for return result; @@ -235,7 +235,7 @@ public class TestHdfsSpout { System.err.println(re); } - listDir(source); + listDir(archive); Path f1 = new Path(archive + "/file1.seq");
