Fixing DirLock. Additional tests for it. Due to problem in hadoop 2.6.0 with concurrent file create, upgrading to hadoop 2.6.1.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2fb0d7d9 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2fb0d7d9 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2fb0d7d9 Branch: refs/heads/1.x-branch Commit: 2fb0d7d980c4f8c328905249b3ff5ed5e64c1558 Parents: 5793cdd Author: Roshan Naik <[email protected]> Authored: Thu Dec 10 18:01:20 2015 -0800 Committer: Roshan Naik <[email protected]> Committed: Thu Jan 14 11:34:55 2016 -0800 ---------------------------------------------------------------------- .../org/apache/storm/hdfs/spout/DirLock.java | 21 ++++-- .../apache/storm/hdfs/spout/TestDirLock.java | 68 ++++++++++++++------ pom.xml | 4 +- 3 files changed, 69 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/2fb0d7d9/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java index ef02a8f..304f26d 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java @@ -19,13 +19,15 @@ package org.apache.storm.hdfs.spout; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.ipc.RemoteException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import org.apache.hadoop.fs.FileAlreadyExistsException; public class DirLock { private FileSystem fs; @@ -43,26 +45,37 @@ public class DirLock { * * @param fs * @param dir the dir on which to get a lock - * @return lock object + * @return The lock object if it the lock was acquired. Returns null if the dir is already locked. * @throws IOException if there were errors */ public static DirLock tryLock(FileSystem fs, Path dir) throws IOException { Path lockFile = new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE ); try { FSDataOutputStream os = fs.create(lockFile, false); - if(log.isInfoEnabled()) { - log.info("Thread acquired dir lock " + threadInfo() + " - lockfile " + lockFile); + if (log.isInfoEnabled()) { + log.info("Thread ({}) acquired lock on dir {}", threadInfo(), dir); } os.close(); return new DirLock(fs, lockFile); } catch (FileAlreadyExistsException e) { + log.info("Thread ({}) cannot lock dir {} as its already locked.", threadInfo(), dir); return null; + } catch (RemoteException e) { + if( e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) ) { + log.info("Thread ({}) cannot lock dir {} as its already locked.", threadInfo(), dir); + return null; + } else { // unexpected error + log.error("Error when acquiring lock on dir " + dir, e); + throw e; + } } } private static String threadInfo () { return "ThdId=" + Thread.currentThread().getId() + ", ThdName=" + Thread.currentThread().getName(); } + + /** Release lock on dir by deleting the lock file */ public void release() throws IOException { fs.delete(lockFile, false); log.info("Thread {} released dir lock {} ", threadInfo(), lockFile); http://git-wip-us.apache.org/repos/asf/storm/blob/2fb0d7d9/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java index 9686fd8..fcfe704 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java @@ -20,6 +20,7 @@ package org.apache.storm.hdfs.spout; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -44,7 +45,7 @@ public class TestDirLock { static MiniDFSCluster hdfsCluster; static FileSystem fs; static String hdfsURI; - static Configuration conf = new HdfsConfiguration(); + static HdfsConfiguration conf = new HdfsConfiguration(); @Rule @@ -54,6 +55,7 @@ public class TestDirLock { @BeforeClass public static void setupClass() throws IOException { + conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000"); builder = new MiniDFSCluster.Builder(new Configuration()); hdfsCluster = builder.build(); fs = hdfsCluster.getFileSystem(); @@ -76,19 +78,36 @@ public class TestDirLock { fs.delete(lockDir, true); } -// @Test + + @Test + public void testBasicLocking() throws Exception { + // 1 grab lock + DirLock lock = DirLock.tryLock(fs, lockDir); + Assert.assertTrue(fs.exists(lock.getLockFile())); + + // 2 try to grab another lock while dir is locked + DirLock lock2 = DirLock.tryLock(fs, lockDir); // should fail + Assert.assertNull(lock2); + + // 3 let go first lock + lock.release(); + Assert.assertFalse(fs.exists(lock.getLockFile())); + + // 4 try locking again + lock2 = DirLock.tryLock(fs, lockDir); + Assert.assertTrue(fs.exists(lock2.getLockFile())); + lock2.release(); + Assert.assertFalse(fs.exists(lock.getLockFile())); + lock2.release(); // should be throw + } + + + @Test public void testConcurrentLocking() throws Exception { -// -Dlog4j.configuration=config - Logger.getRootLogger().setLevel(Level.ERROR); - DirLockingThread[] thds = startThreads(10, lockDir ); - for (DirLockingThread thd : thds) { - thd.start(); - } - System.err.println("Thread creation complete"); - Thread.sleep(5000); + DirLockingThread[] thds = startThreads(100, lockDir ); for (DirLockingThread thd : thds) { - thd.join(1000); - if(thd.isAlive() && thd.cleanExit) + thd.join(); + if( !thd.cleanExit) System.err.println(thd.getName() + " did not exit cleanly"); Assert.assertTrue(thd.cleanExit); } @@ -97,14 +116,16 @@ public class TestDirLock { Assert.assertFalse(fs.exists(lockFile)); } - - private DirLockingThread[] startThreads(int thdCount, Path dir) throws IOException { DirLockingThread[] result = new DirLockingThread[thdCount]; for (int i = 0; i < thdCount; i++) { result[i] = new DirLockingThread(i, fs, dir); } + + for (DirLockingThread thd : result) { + thd.start(); + } return result; } @@ -123,20 +144,31 @@ public class TestDirLock { @Override public void run() { + DirLock lock = null; try { - DirLock lock; do { + System.err.println("Trying lock " + getName()); lock = DirLock.tryLock(fs, dir); + System.err.println("Acquired lock " + getName()); if(lock==null) { System.out.println("Retrying lock - " + Thread.currentThread().getId()); } } while (lock==null); - lock.release(); cleanExit= true; - } catch (IOException e) { + } catch (Exception e) { e.printStackTrace(); } - + finally { + try { + if(lock!=null) { + lock.release(); + System.err.println("Released lock " + getName()); + } + } catch (IOException e) { + e.printStackTrace(System.err); + } + } + System.err.println("Thread exiting " + getName()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/2fb0d7d9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 610f7e9..fed5d3b 100644 --- a/pom.xml +++ b/pom.xml @@ -213,7 +213,7 @@ <clojure.tools.cli.version>0.2.4</clojure.tools.cli.version> <disruptor.version>3.3.2</disruptor.version> <jgrapht.version>0.9.0</jgrapht.version> - <guava.version>16.0.1</guava.version> + <guava.version>15.0</guava.version> <netty.version>3.9.0.Final</netty.version> <log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version> <log4j.version>2.1</log4j.version> @@ -227,7 +227,7 @@ <clojure-data-codec.version>0.1.0</clojure-data-codec.version> <clojure-contrib.version>1.2.0</clojure-contrib.version> <hive.version>0.14.0</hive.version> - <hadoop.version>2.6.0</hadoop.version> + <hadoop.version>2.6.1</hadoop.version> <kryo.version>2.21</kryo.version> <servlet.version>2.5</servlet.version> <joda-time.version>2.3</joda-time.version>
