Fixing DirLock.  Additional tests for it.  Due to problem in hadoop 2.6.0 with 
concurrent file create, upgrading to hadoop 2.6.1.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2fb0d7d9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2fb0d7d9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2fb0d7d9

Branch: refs/heads/1.x-branch
Commit: 2fb0d7d980c4f8c328905249b3ff5ed5e64c1558
Parents: 5793cdd
Author: Roshan Naik <[email protected]>
Authored: Thu Dec 10 18:01:20 2015 -0800
Committer: Roshan Naik <[email protected]>
Committed: Thu Jan 14 11:34:55 2016 -0800

----------------------------------------------------------------------
 .../org/apache/storm/hdfs/spout/DirLock.java    | 21 ++++--
 .../apache/storm/hdfs/spout/TestDirLock.java    | 68 ++++++++++++++------
 pom.xml                                         |  4 +-
 3 files changed, 69 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2fb0d7d9/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java 
b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
index ef02a8f..304f26d 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
@@ -19,13 +19,15 @@
 package org.apache.storm.hdfs.spout;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.ipc.RemoteException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 
 public class DirLock {
   private FileSystem fs;
@@ -43,26 +45,37 @@ public class DirLock {
    *
    * @param fs
    * @param dir  the dir on which to get a lock
-   * @return lock object
+   * @return The lock object if it the lock was acquired. Returns null if the 
dir is already locked.
    * @throws IOException if there were errors
    */
   public static DirLock tryLock(FileSystem fs, Path dir) throws IOException {
     Path lockFile = new Path(dir.toString() + Path.SEPARATOR_CHAR + 
DIR_LOCK_FILE );
     try {
       FSDataOutputStream os = fs.create(lockFile, false);
-      if(log.isInfoEnabled()) {
-        log.info("Thread acquired dir lock  " + threadInfo() + " - lockfile " 
+ lockFile);
+      if (log.isInfoEnabled()) {
+        log.info("Thread ({}) acquired lock on dir {}", threadInfo(), dir);
       }
       os.close();
       return new DirLock(fs, lockFile);
     } catch (FileAlreadyExistsException e) {
+      log.info("Thread ({}) cannot lock dir {} as its already locked.", 
threadInfo(), dir);
       return null;
+    } catch (RemoteException e) {
+      if( 
e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) ) {
+        log.info("Thread ({}) cannot lock dir {} as its already locked.", 
threadInfo(), dir);
+        return null;
+      } else { // unexpected error
+        log.error("Error when acquiring lock on dir " + dir, e);
+        throw e;
+      }
     }
   }
 
   private static String threadInfo () {
     return "ThdId=" + Thread.currentThread().getId() + ", ThdName=" + 
Thread.currentThread().getName();
   }
+
+  /** Release lock on dir by deleting the lock file */
   public void release() throws IOException {
     fs.delete(lockFile, false);
     log.info("Thread {} released dir lock {} ", threadInfo(), lockFile);

http://git-wip-us.apache.org/repos/asf/storm/blob/2fb0d7d9/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
index 9686fd8..fcfe704 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
@@ -20,6 +20,7 @@ package org.apache.storm.hdfs.spout;
 
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -44,7 +45,7 @@ public class TestDirLock {
   static MiniDFSCluster hdfsCluster;
   static FileSystem fs;
   static String hdfsURI;
-  static Configuration conf = new  HdfsConfiguration();
+  static HdfsConfiguration conf = new  HdfsConfiguration();
 
 
   @Rule
@@ -54,6 +55,7 @@ public class TestDirLock {
 
   @BeforeClass
   public static void setupClass() throws IOException {
+    conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000");
     builder = new MiniDFSCluster.Builder(new Configuration());
     hdfsCluster = builder.build();
     fs  = hdfsCluster.getFileSystem();
@@ -76,19 +78,36 @@ public class TestDirLock {
     fs.delete(lockDir, true);
   }
 
-//  @Test
+
+  @Test
+  public void testBasicLocking() throws Exception {
+    // 1 grab lock
+    DirLock lock = DirLock.tryLock(fs, lockDir);
+    Assert.assertTrue(fs.exists(lock.getLockFile()));
+
+    // 2 try to grab another lock while dir is locked
+    DirLock lock2 = DirLock.tryLock(fs, lockDir); // should fail
+    Assert.assertNull(lock2);
+
+    // 3 let go first lock
+    lock.release();
+    Assert.assertFalse(fs.exists(lock.getLockFile()));
+
+    // 4 try locking again
+    lock2  = DirLock.tryLock(fs, lockDir);
+    Assert.assertTrue(fs.exists(lock2.getLockFile()));
+    lock2.release();
+    Assert.assertFalse(fs.exists(lock.getLockFile()));
+    lock2.release();  // should be throw
+  }
+
+
+  @Test
   public void testConcurrentLocking() throws Exception {
-//    -Dlog4j.configuration=config
-    Logger.getRootLogger().setLevel(Level.ERROR);
-    DirLockingThread[] thds = startThreads(10, lockDir );
-    for (DirLockingThread thd : thds) {
-      thd.start();
-    }
-    System.err.println("Thread creation complete");
-    Thread.sleep(5000);
+    DirLockingThread[] thds = startThreads(100, lockDir );
     for (DirLockingThread thd : thds) {
-      thd.join(1000);
-      if(thd.isAlive() && thd.cleanExit)
+      thd.join();
+      if( !thd.cleanExit)
         System.err.println(thd.getName() + " did not exit cleanly");
       Assert.assertTrue(thd.cleanExit);
     }
@@ -97,14 +116,16 @@ public class TestDirLock {
     Assert.assertFalse(fs.exists(lockFile));
   }
 
-
-
   private DirLockingThread[] startThreads(int thdCount, Path dir)
           throws IOException {
     DirLockingThread[] result = new DirLockingThread[thdCount];
     for (int i = 0; i < thdCount; i++) {
       result[i] = new DirLockingThread(i, fs, dir);
     }
+
+    for (DirLockingThread thd : result) {
+      thd.start();
+    }
     return result;
   }
 
@@ -123,20 +144,31 @@ public class TestDirLock {
 
     @Override
     public void run() {
+      DirLock lock = null;
       try {
-        DirLock lock;
         do {
+          System.err.println("Trying lock " + getName());
           lock = DirLock.tryLock(fs, dir);
+          System.err.println("Acquired lock " + getName());
           if(lock==null) {
             System.out.println("Retrying lock - " + 
Thread.currentThread().getId());
           }
         } while (lock==null);
-        lock.release();
         cleanExit= true;
-      } catch (IOException e) {
+      } catch (Exception e) {
         e.printStackTrace();
       }
-
+      finally {
+          try {
+            if(lock!=null) {
+              lock.release();
+              System.err.println("Released lock " + getName());
+            }
+          } catch (IOException e) {
+            e.printStackTrace(System.err);
+          }
+      }
+      System.err.println("Thread exiting " + getName());
     }
 
   }

http://git-wip-us.apache.org/repos/asf/storm/blob/2fb0d7d9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 610f7e9..fed5d3b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,7 +213,7 @@
         <clojure.tools.cli.version>0.2.4</clojure.tools.cli.version>
         <disruptor.version>3.3.2</disruptor.version>
         <jgrapht.version>0.9.0</jgrapht.version>
-        <guava.version>16.0.1</guava.version>
+        <guava.version>15.0</guava.version>
         <netty.version>3.9.0.Final</netty.version>
         <log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
         <log4j.version>2.1</log4j.version>
@@ -227,7 +227,7 @@
         <clojure-data-codec.version>0.1.0</clojure-data-codec.version>
         <clojure-contrib.version>1.2.0</clojure-contrib.version>
         <hive.version>0.14.0</hive.version>
-        <hadoop.version>2.6.0</hadoop.version>
+        <hadoop.version>2.6.1</hadoop.version>
         <kryo.version>2.21</kryo.version>
         <servlet.version>2.5</servlet.version>
         <joda-time.version>2.3</joda-time.version>

Reply via email to