Adding stale lock recovery in DirLock. Added tests for filelock recovery, dirlock recovery, commit_freq_sec & commit_freq_count, TestHdfsSpout.testLocking, TestHdfsSemantics, some review comments etc
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/152856d1 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/152856d1 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/152856d1 Branch: refs/heads/1.x-branch Commit: 152856d1156065f51430497629ee37412ac098b2 Parents: de37de6 Author: Roshan Naik <[email protected]> Authored: Thu Dec 17 14:19:54 2015 -0800 Committer: Roshan Naik <[email protected]> Committed: Thu Jan 14 11:34:56 2016 -0800 ---------------------------------------------------------------------- .../hdfs/common/CmpFilesByModificationTime.java | 6 +- .../org/apache/storm/hdfs/common/HdfsUtils.java | 8 +- .../org/apache/storm/hdfs/spout/Configs.java | 4 +- .../org/apache/storm/hdfs/spout/DirLock.java | 37 +++- .../org/apache/storm/hdfs/spout/FileLock.java | 16 +- .../org/apache/storm/hdfs/spout/HdfsSpout.java | 39 ++-- .../apache/storm/hdfs/spout/TestDirLock.java | 40 ++-- .../apache/storm/hdfs/spout/TestFileLock.java | 33 ++- .../storm/hdfs/spout/TestHdfsSemantics.java | 204 +++++++++++++++++++ .../apache/storm/hdfs/spout/TestHdfsSpout.java | 155 ++++++++++---- .../src/test/resources/log4j.properties | 26 +++ 11 files changed, 473 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java index acee9a5..67420aa 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java @@ -18,15 +18,15 @@ package org.apache.storm.hdfs.common; -import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.FileStatus; import java.util.Comparator; public class CmpFilesByModificationTime - implements Comparator<LocatedFileStatus> { + implements Comparator<FileStatus> { @Override - public int compare(LocatedFileStatus o1, LocatedFileStatus o2) { + public int compare(FileStatus o1, FileStatus o2) { return new Long(o1.getModificationTime()).compareTo( o1.getModificationTime() ); } } http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java index 0574c6a..e8df78d 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java @@ -29,13 +29,12 @@ import org.apache.hadoop.ipc.RemoteException; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; public class HdfsUtils { /** list files sorted by modification time that have not been modified since 'olderThan'. if * 'olderThan' is <= 0 then the filtering is disabled */ - public static Collection<Path> listFilesByModificationTime(FileSystem fs, Path directory, long olderThan) + public static ArrayList<Path> listFilesByModificationTime(FileSystem fs, Path directory, long olderThan) throws IOException { ArrayList<LocatedFileStatus> fstats = new ArrayList<>(); @@ -43,7 +42,7 @@ public class HdfsUtils { while( itr.hasNext() ) { LocatedFileStatus fileStatus = itr.next(); if(olderThan>0) { - if( fileStatus.getModificationTime()<olderThan ) + if( fileStatus.getModificationTime()<=olderThan ) fstats.add(fileStatus); } else { @@ -69,7 +68,7 @@ public class HdfsUtils { } catch (FileAlreadyExistsException e) { return null; } catch (RemoteException e) { - if( e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) ) { + if( e.unwrapRemoteException() instanceof AlreadyBeingCreatedException ) { return null; } else { // unexpected error throw e; @@ -77,7 +76,6 @@ public class HdfsUtils { } } - public static class Pair<K,V> { private K key; private V value; http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java index 9a9ae73..93d775b 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java @@ -27,8 +27,8 @@ public class Configs { public static final String ARCHIVE_DIR = "hdfsspout.archive.dir"; // completed files will be moved here public static final String BAD_DIR = "hdfsspout.badfiles.dir"; // unpraseable files will be moved here public static final String LOCK_DIR = "hdfsspout.lock.dir"; // dir in which lock files will be created - public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count"; // commit after N records - public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec"; // commit after N secs + public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count"; // commit after N records. 0 disables this. + public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec"; // commit after N secs. cannot be disabled. public static final String MAX_DUPLICATE = "hdfsspout.max.duplicate"; public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec"; // inactivity duration after which locks are considered candidates for being reassigned to another spout public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync"; // if clocks on machines in the Storm cluster are in sync http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java index 0ff2f37..06ca749 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java @@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; /** - * Facility to sychronize access to HDFS directory. The lock itself is represented + * Facility to synchronize access to HDFS directory. The lock itself is represented * as a file in the same directory. Relies on atomic file creation. */ public class DirLock { @@ -51,7 +51,7 @@ public class DirLock { * @throws IOException if there were errors */ public static DirLock tryLock(FileSystem fs, Path dir) throws IOException { - Path lockFile = new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE ); + Path lockFile = getDirLockFile(dir); try { FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile); @@ -69,6 +69,10 @@ public class DirLock { } } + private static Path getDirLockFile(Path dir) { + return new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE ); + } + private static String threadInfo () { return "ThdId=" + Thread.currentThread().getId() + ", ThdName=" + Thread.currentThread().getName(); @@ -80,6 +84,35 @@ public class DirLock { log.info("Thread {} released dir lock {} ", threadInfo(), lockFile); } + /** if the lock on the directory is stale, take ownership */ + public static DirLock takeOwnershipIfStale(FileSystem fs, Path dirToLock, int lockTimeoutSec) { + Path dirLockFile = getDirLockFile(dirToLock); + + long now = System.currentTimeMillis(); + long expiryTime = now - (lockTimeoutSec*1000); + + try { + long modTime = fs.getFileStatus(dirLockFile).getModificationTime(); + if(modTime <= expiryTime) + return takeOwnership(fs, dirLockFile); + return null; + } catch (IOException e) { + return null; + } + } + + + private static DirLock takeOwnership(FileSystem fs, Path dirLockFile) throws IOException { + // delete and recreate lock file + if( fs.delete(dirLockFile, false) ) { // returns false if somebody else already deleted it (to take ownership) + FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, dirLockFile); + if(ostream!=null) + ostream.close(); + return new DirLock(fs, dirLockFile); + } + return null; + } + public Path getLockFile() { return lockFile; } http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java index 76a459d..b40d1dd 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java @@ -92,6 +92,12 @@ public class FileLock { public void release() throws IOException { lockFileStream.close(); fs.delete(lockFile, false); + log.debug("Released lock file {}", lockFile); + } + + // for testing only.. invoked via reflection + private void forceCloseLockFile() throws IOException { + lockFileStream.close(); } /** returns lock on file or null if file is already locked. throws if unexpected problem */ @@ -135,6 +141,7 @@ public class FileLock { if(lastEntry==null) { throw new RuntimeException(lockFile.getName() + " is empty. this file is invalid."); } + log.error("{} , lastModified= {}, expiryTime= {}, diff= {}", lockFile, lastEntry.eventTime, olderThan, lastEntry.eventTime-olderThan ); if( lastEntry.eventTime <= olderThan ) return lastEntry; } @@ -176,8 +183,8 @@ public class FileLock { try { return new FileLock(fs, lockFile, spoutId, lastEntry); } catch (RemoteException e) { - if (e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) { - log.info("Lock file {} is currently open. cannot transfer ownership on.", lockFile); + if (e.unwrapRemoteException() instanceof AlreadyBeingCreatedException) { + log.info("Lock file {} is currently open. Cannot transfer ownership.", lockFile); return null; } else { // unexpected error throw e; @@ -198,7 +205,8 @@ public class FileLock { public static FileLock acquireOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId) throws IOException { // list files - long olderThan = System.currentTimeMillis() - (locktimeoutSec*1000); + long now = System.currentTimeMillis(); + long olderThan = now - (locktimeoutSec*1000); Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan); // locate expired lock files (if any). Try to take ownership (oldest lock first) @@ -213,7 +221,7 @@ public class FileLock { } } if(listing.isEmpty()) - log.info("No abandoned files to be refound"); + log.info("No abandoned lock files found"); return null; } http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index 7977b96..50c2172 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -124,14 +124,14 @@ public class HdfsSpout extends BaseRichSpout { return; } - // 2) If no failed tuples, then send tuples from hdfs + // 2) If no failed tuples to be retried, then send tuples from hdfs while (true) { try { // 3) Select a new file if one is not open already if (reader == null) { reader = pickNextFile(); if (reader == null) { - LOG.info("Currently no new files to process under : " + sourceDirPath); + LOG.debug("Currently no new files to process under : " + sourceDirPath); return; } } @@ -165,8 +165,9 @@ public class HdfsSpout extends BaseRichSpout { LOG.error("Parsing error when processing at file location " + getFileProgress(reader) + ". Skipping remainder of file.", e); markFileAsBad(reader.getFilePath()); - // note: Unfortunately not emitting anything here due to parse error - // will trigger the configured spout wait strategy which is unnecessary + // Note: We don't return from this method on ParseException to avoid triggering the + // spout wait strategy (due to no emits). Instead we go back into the loop and + // generate a tuple from next file } } @@ -192,7 +193,7 @@ public class HdfsSpout extends BaseRichSpout { TimerTask timerTask = new TimerTask() { @Override public void run() { - commitTimeElapsed.set(false); + commitTimeElapsed.set(true); } }; commitTimer.schedule(timerTask, commitFrequencySec * 1000); @@ -206,7 +207,8 @@ public class HdfsSpout extends BaseRichSpout { private void markFileAsDone(Path filePath) { fileReadCompletely = false; try { - renameCompletedFile(reader.getFilePath()); + Path newFile = renameCompletedFile(reader.getFilePath()); + LOG.info("Completed processing {}", newFile); } catch (IOException e) { LOG.error("Unable to archive completed file" + filePath, e); } @@ -220,7 +222,7 @@ public class HdfsSpout extends BaseRichSpout { String originalName = new Path(fileNameMinusSuffix).getName(); Path newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName); - LOG.info("Moving bad file to " + newFile); + LOG.info("Moving bad file {} to {} ", originalName, newFile); try { if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning false or throwing exception throw new IOException("Move failed for bad file: " + file); // convert false ret value to exception @@ -254,7 +256,7 @@ public class HdfsSpout extends BaseRichSpout { public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; final String FILE_SYSTEM = "filesystem"; - LOG.info("Opening"); + LOG.info("Opening HDFS Spout"); this.collector = collector; this.hdfsConfig = new Configuration(); this.tupleCounter = 0; @@ -436,7 +438,8 @@ public class HdfsSpout extends BaseRichSpout { } private boolean canCommitNow() { - if( acksSinceLastCommit >= commitFrequencyCount ) + + if( commitFrequencyCount>0 && acksSinceLastCommit >= commitFrequencyCount ) return true; return commitTimeElapsed.get(); } @@ -455,7 +458,7 @@ public class HdfsSpout extends BaseRichSpout { if (lock != null) { Path file = getFileForLockFile(lock.getLockFile(), sourceDirPath); String resumeFromOffset = lock.getLastLogEntry().fileOffset; - LOG.info("Processing abandoned file : {}", file); + LOG.info("Resuming processing of abandoned file : {}", file); return createFileReader(file, resumeFromOffset); } @@ -468,12 +471,12 @@ public class HdfsSpout extends BaseRichSpout { if( file.getName().endsWith(ignoreSuffix) ) continue; - LOG.info("Processing : {} ", file); lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId); if( lock==null ) { - LOG.info("Unable to get lock, so skipping file: {}", file); + LOG.debug("Unable to get lock, so skipping file: {}", file); continue; // could not lock, so try another file. } + LOG.info("Processing : {} ", file); Path newFile = renameSelectedFile(file); return createFileReader(newFile); } @@ -494,8 +497,11 @@ public class HdfsSpout extends BaseRichSpout { private FileLock getOldestExpiredLock() throws IOException { // 1 - acquire lock on dir DirLock dirlock = DirLock.tryLock(hdfs, lockDirPath); - if (dirlock == null) - return null; + if (dirlock == null) { + dirlock = DirLock.takeOwnershipIfStale(hdfs, lockDirPath, lockTimeoutSec); + if (dirlock == null) + return null; + } try { // 2 - if clocks are in sync then simply take ownership of the oldest expired lock if (clocksInSync) @@ -606,14 +612,15 @@ public class HdfsSpout extends BaseRichSpout { } + // renames files and returns the new file path private Path renameCompletedFile(Path file) throws IOException { String fileName = file.toString(); String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix)); String newName = new Path(fileNameMinusSuffix).getName(); Path newFile = new Path( archiveDirPath + Path.SEPARATOR + newName ); - LOG.debug("Renaming complete file to " + newFile); - LOG.info("Completed file " + fileNameMinusSuffix ); + LOG.debug("Renaming complete file to {} ", newFile); + LOG.info("Completed file {}", fileNameMinusSuffix ); if (!hdfs.rename(file, newFile) ) { throw new IOException("Rename failed for file: " + file); } http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java index 667248e..a7b73d6 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java @@ -25,16 +25,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import java.io.IOException; @@ -45,8 +41,8 @@ public class TestDirLock { static FileSystem fs; static String hdfsURI; static HdfsConfiguration conf = new HdfsConfiguration(); - - private Path lockDir = new Path("/tmp/lockdir"); + static final int LOCK_EXPIRY_SEC = 1; + private Path locksDir = new Path("/tmp/lockdir"); @BeforeClass public static void setupClass() throws IOException { @@ -65,23 +61,23 @@ public class TestDirLock { @Before public void setUp() throws Exception { - assert fs.mkdirs(lockDir) ; + assert fs.mkdirs(locksDir) ; } @After public void tearDown() throws Exception { - fs.delete(lockDir, true); + fs.delete(locksDir, true); } @Test public void testBasicLocking() throws Exception { // 1 grab lock - DirLock lock = DirLock.tryLock(fs, lockDir); + DirLock lock = DirLock.tryLock(fs, locksDir); Assert.assertTrue(fs.exists(lock.getLockFile())); // 2 try to grab another lock while dir is locked - DirLock lock2 = DirLock.tryLock(fs, lockDir); // should fail + DirLock lock2 = DirLock.tryLock(fs, locksDir); // should fail Assert.assertNull(lock2); // 3 let go first lock @@ -89,7 +85,7 @@ public class TestDirLock { Assert.assertFalse(fs.exists(lock.getLockFile())); // 4 try locking again - lock2 = DirLock.tryLock(fs, lockDir); + lock2 = DirLock.tryLock(fs, locksDir); Assert.assertTrue(fs.exists(lock2.getLockFile())); lock2.release(); Assert.assertFalse(fs.exists(lock.getLockFile())); @@ -99,7 +95,7 @@ public class TestDirLock { @Test public void testConcurrentLocking() throws Exception { - DirLockingThread[] thds = startThreads(100, lockDir ); + DirLockingThread[] thds = startThreads(100, locksDir); for (DirLockingThread thd : thds) { thd.join(); if( !thd.cleanExit) @@ -107,7 +103,7 @@ public class TestDirLock { Assert.assertTrue(thd.cleanExit); } - Path lockFile = new Path(lockDir + Path.SEPARATOR + DirLock.DIR_LOCK_FILE); + Path lockFile = new Path(locksDir + Path.SEPARATOR + DirLock.DIR_LOCK_FILE); Assert.assertFalse(fs.exists(lockFile)); } @@ -124,6 +120,24 @@ public class TestDirLock { return result; } + @Test + public void testLockRecovery() throws Exception { + DirLock lock1 = DirLock.tryLock(fs, locksDir); // should pass + Assert.assertNotNull(lock1); + + DirLock lock2 = DirLock.takeOwnershipIfStale(fs, locksDir, LOCK_EXPIRY_SEC); // should fail + Assert.assertNull(lock2); + + Thread.sleep(LOCK_EXPIRY_SEC*1000 + 500); // wait for lock to expire + Assert.assertTrue(fs.exists(lock1.getLockFile())); + + DirLock lock3 = DirLock.takeOwnershipIfStale(fs, locksDir, LOCK_EXPIRY_SEC); // should pass now + Assert.assertNotNull(lock3); + Assert.assertTrue(fs.exists(lock3.getLockFile())); + lock3.release(); + Assert.assertFalse(fs.exists(lock3.getLockFile())); + lock1.release(); // should not throw + } class DirLockingThread extends Thread { http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java index 1f22a5b..a97b3f2 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java @@ -33,10 +33,12 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; + import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; +import java.lang.reflect.Method; import java.util.ArrayList; public class TestFileLock { @@ -68,11 +70,13 @@ public class TestFileLock { @Before public void setUp() throws Exception { assert fs.mkdirs(filesDir) ; + assert fs.mkdirs(locksDir) ; } @After public void tearDown() throws Exception { fs.delete(filesDir, true); + fs.delete(locksDir, true); } @Test @@ -261,9 +265,9 @@ public class TestFileLock { } @Test - public void testStaleLockRecovery() throws Exception { + public void testLockRecovery() throws Exception { final int LOCK_EXPIRY_SEC = 1; - final int WAIT_MSEC = 1500; + final int WAIT_MSEC = LOCK_EXPIRY_SEC*1000 + 500; Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); Path file2 = new Path(filesDir + Path.SEPARATOR + "file2"); Path file3 = new Path(filesDir + Path.SEPARATOR + "file3"); @@ -284,27 +288,38 @@ public class TestFileLock { HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC); Assert.assertNull(expired); + // 1) Simulate lock file lease expiring and getting closed by HDFS + closeUnderlyingLockFile(lock3); + // 2) wait for all 3 locks to expire then heart beat on 2 locks - Thread.sleep(WAIT_MSEC); + Thread.sleep(WAIT_MSEC*2); // wait for locks to expire lock1.heartbeat("1"); lock2.heartbeat("1"); - //todo: configure the HDFS lease timeout - // 3) Take ownership of stale lock FileLock lock3b = FileLock.acquireOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC, "spout1"); -// Assert.assertNotNull(lock3b); -// Assert.assertEquals("Expected lock3 file", lock3b.getLockFile(), lock3.getLockFile()); - }finally { + Assert.assertNotNull(lock3b); + Assert.assertEquals("Expected lock3 file", Path.getPathWithoutSchemeAndAuthority(lock3b.getLockFile()), lock3.getLockFile()); + } finally { lock1.release(); lock2.release(); lock3.release(); fs.delete(file1, false); fs.delete(file2, false); - fs.delete(file3, false); + try { + fs.delete(file3, false); + } catch (Exception e) { + e.printStackTrace(); + } } } + private void closeUnderlyingLockFile(FileLock lock) throws ReflectiveOperationException { + Method m = FileLock.class.getDeclaredMethod("forceCloseLockFile"); + m.setAccessible(true); + m.invoke(lock); + } + /** return null if file not found */ private ArrayList<String> readTextFile(Path file) throws IOException { FSDataInputStream os = null; http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java new file mode 100644 index 0000000..6628cc9 --- /dev/null +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hdfs.spout; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.ipc.RemoteException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +public class TestHdfsSemantics { + + static MiniDFSCluster.Builder builder; + static MiniDFSCluster hdfsCluster; + static FileSystem fs; + static String hdfsURI; + static HdfsConfiguration conf = new HdfsConfiguration(); + + private Path dir = new Path("/tmp/filesdir"); + + @BeforeClass + public static void setupClass() throws IOException { + conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000"); + builder = new MiniDFSCluster.Builder(new Configuration()); + hdfsCluster = builder.build(); + fs = hdfsCluster.getFileSystem(); + hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/"; + } + + @AfterClass + public static void teardownClass() throws IOException { + fs.close(); + hdfsCluster.shutdown(); + } + + @Before + public void setUp() throws Exception { + assert fs.mkdirs(dir) ; + } + + @After + public void tearDown() throws Exception { + fs.delete(dir, true); + } + + + @Test + public void testDeleteSemantics() throws Exception { + Path file = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1"); +// try { + // 1) Delete absent file - should return false + Assert.assertFalse(fs.exists(file)); + try { + Assert.assertFalse(fs.delete(file, false)); + } catch (IOException e) { + e.printStackTrace(); + } + + // 2) deleting open file - should return true + fs.create(file, false); + Assert.assertTrue(fs.delete(file, false)); + + // 3) deleting closed file - should return true + FSDataOutputStream os = fs.create(file, false); + os.close(); + Assert.assertTrue(fs.exists(file)); + Assert.assertTrue(fs.delete(file, false)); + Assert.assertFalse(fs.exists(file)); + } + + @Test + public void testConcurrentDeletion() throws Exception { + Path file = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1"); + fs.create(file).close(); + // 1 concurrent deletion - only one thread should succeed + FileDeletionThread[] thds = startThreads(10, file); + int successCount=0; + for (FileDeletionThread thd : thds) { + thd.join(); + if( thd.succeeded) + successCount++; + if(thd.exception!=null) + Assert.assertNotNull(thd.exception); + } + System.err.println(successCount); + Assert.assertEquals(1, successCount); + + } + + @Test + public void testAppendSemantics() throws Exception { + //1 try to append to an open file + Path file1 = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1"); + FSDataOutputStream os1 = fs.create(file1, false); + try { + fs.append(file1); // should fail + Assert.assertTrue("Append did not throw an exception", false); + } catch (RemoteException e) { + // expecting AlreadyBeingCreatedException inside RemoteException + Assert.assertEquals(AlreadyBeingCreatedException.class, e.unwrapRemoteException().getClass()); + } + + //2 try to append to a closed file + os1.close(); + FSDataOutputStream os2 = fs.append(file1); // should pass + os2.close(); + } + + @Test + public void testDoubleCreateSemantics() throws Exception { + //1 create an already existing open file w/o override flag + Path file1 = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1"); + FSDataOutputStream os1 = fs.create(file1, false); + try { + fs.create(file1, false); // should fail + Assert.assertTrue("Create did not throw an exception", false); + } catch (RemoteException e) { + Assert.assertEquals(AlreadyBeingCreatedException.class, e.unwrapRemoteException().getClass()); + } + //2 close file and retry creation + os1.close(); + try { + fs.create(file1, false); // should still fail + } catch (FileAlreadyExistsException e) { + // expecting this exception + } + + //3 delete file and retry creation + fs.delete(file1, false); + FSDataOutputStream os2 = fs.create(file1, false); // should pass + Assert.assertNotNull(os2); + os2.close(); + } + + + private FileDeletionThread[] startThreads(int thdCount, Path file) + throws IOException { + FileDeletionThread[] result = new FileDeletionThread[thdCount]; + for (int i = 0; i < thdCount; i++) { + result[i] = new FileDeletionThread(i, fs, file); + } + + for (FileDeletionThread thd : result) { + thd.start(); + } + return result; + } + + private static class FileDeletionThread extends Thread { + + private final int thdNum; + private final FileSystem fs; + private final Path file; + public boolean succeeded; + public Exception exception = null; + + public FileDeletionThread(int thdNum, FileSystem fs, Path file) + throws IOException { + this.thdNum = thdNum; + this.fs = fs; + this.file = file; + } + + @Override + public void run() { + Thread.currentThread().setName("FileDeletionThread-" + thdNum); + try { + succeeded = fs.delete(file, false); + } catch (Exception e) { + exception = e; + } + } // run() + + } // class FileLockingThread +} http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java index d967572..98d21f8 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java @@ -105,7 +105,7 @@ public class TestHdfsSpout { @After public void shutDown() throws IOException { - fs.delete(new Path(baseFolder.toString()),true); + fs.delete(new Path(baseFolder.toString()), true); } @Test @@ -134,7 +134,6 @@ public class TestHdfsSpout { checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2); } - private void checkCollectorOutput_txt(MockCollector collector, Path... txtFiles) throws IOException { ArrayList<String> expected = new ArrayList<>(); for (Path txtFile : txtFiles) { @@ -196,10 +195,6 @@ public class TestHdfsSpout { listDir(archive); } - private List<String> listBadDir() throws IOException { - return listDir(badfiles); - } - private List<String> listDir(Path p) throws IOException { ArrayList<String> result = new ArrayList<>(); System.err.println("*** Listing " + p); @@ -207,7 +202,7 @@ public class TestHdfsSpout { while ( fileNames.hasNext() ) { LocatedFileStatus fileStatus = fileNames.next(); System.err.println(fileStatus.getPath()); - result.add(fileStatus.getPath().toString()); + result.add(Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()); } return result; } @@ -244,50 +239,127 @@ public class TestHdfsSpout { checkCollectorOutput_seq((MockCollector) spout.getCollector(), f1, f2); } -// - TODO: this test needs the spout to fail with an exception @Test - public void testFailure() throws Exception { - + public void testReadFailures() throws Exception { + // 1) create couple of input files to read Path file1 = new Path(source.toString() + "/file1.txt"); - createTextFile(file1, 5); + Path file2 = new Path(source.toString() + "/file2.txt"); - listDir(source); + createTextFile(file1, 6); + createTextFile(file2, 7); + Assert.assertEquals(2, listDir(source).size()); + // 2) run spout Map conf = getDefaultConfig(); -// conf.put(HdfsSpout.Configs.BACKOFF_SEC, "2"); HdfsSpout spout = makeSpout(0, conf, MockTextFailingReader.class.getName()); - List<String> res = runSpout(spout, "r3"); - for (String re : res) { - System.err.println(re); - } - - listCompletedDir(); - List<String> badFiles = listBadDir(); - Assert.assertEquals( badFiles.size(), 1); - Assert.assertEquals(((MockCollector) spout.getCollector()).lines.size(), 1); + List<String> res = runSpout(spout, "r11"); + String[] expected = new String[] {"[line 0]","[line 1]","[line 2]","[line 0]","[line 1]","[line 2]"}; + Assert.assertArrayEquals(expected, res.toArray()); + + // 3) make sure 6 lines (3 from each file) were read in all + Assert.assertEquals(((MockCollector) spout.getCollector()).lines.size(), 6); + ArrayList<Path> badFiles = HdfsUtils.listFilesByModificationTime(fs, badfiles, 0); + Assert.assertEquals(badFiles.size(), 2); } - // @Test + // check lock creation/deletion and contents + @Test public void testLocking() throws Exception { + Path file1 = new Path(source.toString() + "/file1.txt"); + createTextFile(file1, 10); + + // 0) config spout to log progress in lock file for each tuple + Map conf = getDefaultConfig(); + conf.put(Configs.COMMIT_FREQ_COUNT, "1"); + conf.put(Configs.COMMIT_FREQ_SEC, "100"); // make it irrelvant + HdfsSpout spout = makeSpout(0, conf, Configs.TEXT); + + // 1) read initial lines in file, then check if lock exists + List<String> res = runSpout(spout, "r5"); + Assert.assertEquals(5, res.size()); + List<String> lockFiles = listDir(spout.getLockDirPath()); + Assert.assertEquals(1, lockFiles.size()); + + // 2) check log file content line count == tuples emitted + 1 + List<String> lines = readTextFile(fs, lockFiles.get(0)); + Assert.assertEquals(lines.size(), res.size()+1); + + // 3) read remaining lines in file, then ensure lock is gone + runSpout(spout, "r6"); + lockFiles = listDir(spout.getLockDirPath()); + Assert.assertEquals(0, lockFiles.size()); + + + // 4) --- Create another input file and reverify same behavior --- + Path file2 = new Path(source.toString() + "/file2.txt"); + createTextFile(file2, 10); + + // 5) read initial lines in file, then check if lock exists + res = runSpout(spout, "r5"); + Assert.assertEquals(15, res.size()); + lockFiles = listDir(spout.getLockDirPath()); + Assert.assertEquals(1, lockFiles.size()); + + // 6) check log file content line count == tuples emitted + 1 + lines = readTextFile(fs, lockFiles.get(0)); + Assert.assertEquals(6, lines.size()); + + // 7) read remaining lines in file, then ensure lock is gone + runSpout(spout, "r6"); + lockFiles = listDir(spout.getLockDirPath()); + Assert.assertEquals(0, lockFiles.size()); + } + + @Test + public void testLockLoggingFreqCount() throws Exception { Path file1 = new Path(source.toString() + "/file1.txt"); - createTextFile(file1, 5); + createTextFile(file1, 10); - listDir(source); + // 0) config spout to log progress in lock file for each tuple + Map conf = getDefaultConfig(); + conf.put(Configs.COMMIT_FREQ_COUNT, "2"); // 1 lock log entry every 2 tuples + conf.put(Configs.COMMIT_FREQ_SEC, "1000"); // make it irrelevant for this test + HdfsSpout spout = makeSpout(0, conf, Configs.TEXT); + + // 1) read 5 lines in file, + runSpout(spout, "r5"); + + // 2) check log file contents + String lockFile = listDir(spout.getLockDirPath()).get(0); + List<String> lines = readTextFile(fs, lockFile); + Assert.assertEquals(lines.size(), 3); + + // 3) read 6th line and see if another log entry was made + runSpout(spout, "r1"); + lines = readTextFile(fs, lockFile); + Assert.assertEquals(lines.size(), 4); + } + @Test + public void testLockLoggingFreqSec() throws Exception { + Path file1 = new Path(source.toString() + "/file1.txt"); + createTextFile(file1, 10); + + // 0) config spout to log progress in lock file for each tuple Map conf = getDefaultConfig(); - conf.put(Configs.COMMIT_FREQ_COUNT, "1"); - conf.put(Configs.COMMIT_FREQ_SEC, "1"); + conf.put(Configs.COMMIT_FREQ_COUNT, "0"); // disable it + conf.put(Configs.COMMIT_FREQ_SEC, "2"); // log every 2 sec + HdfsSpout spout = makeSpout(0, conf, Configs.TEXT); - List<String> res = runSpout(spout,"r4"); - for (String re : res) { - System.err.println(re); - } - List<String> lockFiles = listDir(spout.getLockDirPath()); - Assert.assertEquals(1, lockFiles.size()); - runSpout(spout, "r3"); - List<String> lines = readTextFile(fs, lockFiles.get(0)); - System.err.println(lines); - Assert.assertEquals(6, lines.size()); + + // 1) read 5 lines in file + runSpout(spout, "r5"); + + // 2) check log file contents + String lockFile = listDir(spout.getLockDirPath()).get(0); + List<String> lines = readTextFile(fs, lockFile); + Assert.assertEquals(lines.size(), 1); + Thread.sleep(3000); // allow freq_sec to expire + + // 3) read another line and see if another log entry was made + runSpout(spout, "r1"); + lines = readTextFile(fs, lockFile); + Assert.assertEquals(2, lines.size()); } private static List<String> readTextFile(FileSystem fs, String f) throws IOException { @@ -320,7 +392,7 @@ public class TestHdfsSpout { } /** - * Execute a sequence of calls to EventHubSpout. + * Execute a sequence of calls on HdfsSpout. * * @param cmds: set of commands to run, * e.g. "r,r,r,r,a1,f2,...". The commands are: @@ -427,7 +499,8 @@ public class TestHdfsSpout { - // Throws exceptions for 2nd and 3rd line read attempt + // Throws IOExceptions for 3rd & 4th call to next(), succeeds on 5th, thereafter + // throws ParseException. Effectively produces 3 lines (1,2 & 3) from each file read static class MockTextFailingReader extends TextFileReader { int readAttempts = 0; @@ -438,9 +511,9 @@ public class TestHdfsSpout { @Override public List<Object> next() throws IOException, ParseException { readAttempts++; - if (readAttempts == 2) { + if (readAttempts == 3 || readAttempts ==4) { throw new IOException("mock test exception"); - } else if (readAttempts >= 3) { + } else if (readAttempts > 5 ) { throw new ParseException("mock test exception", null); } return super.next(); http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/resources/log4j.properties b/external/storm-hdfs/src/test/resources/log4j.properties new file mode 100644 index 0000000..1f92e45 --- /dev/null +++ b/external/storm-hdfs/src/test/resources/log4j.properties @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +log4j.rootLogger = WARN, out + +log4j.appender.out = org.apache.log4j.ConsoleAppender +log4j.appender.out.layout = org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n + +log4j.logger.org.apache.storm.hdfs = INFO +
