fixing FileLock and sharing code with DirLock for file creation logic
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dcc930b9 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dcc930b9 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dcc930b9 Branch: refs/heads/1.x-branch Commit: dcc930b9ff663f7539dd00e49a68e1bcdcf308d4 Parents: 2fb0d7d Author: Roshan Naik <[email protected]> Authored: Thu Dec 10 19:23:59 2015 -0800 Committer: Roshan Naik <[email protected]> Committed: Thu Jan 14 11:34:55 2016 -0800 ---------------------------------------------------------------------- .../org/apache/storm/hdfs/common/HdfsUtils.java | 24 ++++ .../org/apache/storm/hdfs/spout/DirLock.java | 33 +++--- .../org/apache/storm/hdfs/spout/FileLock.java | 50 +++++--- .../apache/storm/hdfs/spout/TestDirLock.java | 5 - .../apache/storm/hdfs/spout/TestFileLock.java | 117 +++++++++++++++++++ 5 files changed, 192 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/dcc930b9/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java index 8fc8b0d..e8c32aa 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java @@ -18,10 +18,15 @@ package org.apache.storm.hdfs.common; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.storm.hdfs.spout.DirLock; import java.io.IOException; import java.util.ArrayList; @@ -55,6 +60,25 @@ public class HdfsUtils { return result; } + /** + * Returns true if succeeded. False if file already exists. throws if there was unexpected problem + */ + public static FSDataOutputStream tryCreateFile(FileSystem fs, Path file) throws IOException { + try { + FSDataOutputStream os = fs.create(file, false); + return os; + } catch (FileAlreadyExistsException e) { + return null; + } catch (RemoteException e) { + if( e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) ) { + return null; + } else { // unexpected error + throw e; + } + } + } + + public static class Pair<K,V> { private K key; private V value; http://git-wip-us.apache.org/repos/asf/storm/blob/dcc930b9/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java index 304f26d..0ff2f37 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java @@ -21,14 +21,16 @@ package org.apache.storm.hdfs.spout; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; -import org.apache.hadoop.ipc.RemoteException; +import org.apache.storm.hdfs.common.HdfsUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import org.apache.hadoop.fs.FileAlreadyExistsException; +/** + * Facility to sychronize access to HDFS directory. The lock itself is represented + * as a file in the same directory. Relies on atomic file creation. + */ public class DirLock { private FileSystem fs; private final Path lockFile; @@ -41,7 +43,7 @@ public class DirLock { this.lockFile = lockFile; } - /** Returns null if somebody else has a lock + /** Get a lock on file if not already locked * * @param fs * @param dir the dir on which to get a lock @@ -50,29 +52,26 @@ public class DirLock { */ public static DirLock tryLock(FileSystem fs, Path dir) throws IOException { Path lockFile = new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE ); + try { - FSDataOutputStream os = fs.create(lockFile, false); - if (log.isInfoEnabled()) { + FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile); + if (ostream!=null) { log.info("Thread ({}) acquired lock on dir {}", threadInfo(), dir); - } - os.close(); - return new DirLock(fs, lockFile); - } catch (FileAlreadyExistsException e) { - log.info("Thread ({}) cannot lock dir {} as its already locked.", threadInfo(), dir); - return null; - } catch (RemoteException e) { - if( e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) ) { + ostream.close(); + return new DirLock(fs, lockFile); + } else { log.info("Thread ({}) cannot lock dir {} as its already locked.", threadInfo(), dir); return null; - } else { // unexpected error + } + } catch (IOException e) { log.error("Error when acquiring lock on dir " + dir, e); throw e; - } } } private static String threadInfo () { - return "ThdId=" + Thread.currentThread().getId() + ", ThdName=" + Thread.currentThread().getName(); + return "ThdId=" + Thread.currentThread().getId() + ", ThdName=" + + Thread.currentThread().getName(); } /** Release lock on dir by deleting the lock file */ http://git-wip-us.apache.org/repos/asf/storm/blob/dcc930b9/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java index f4a6813..1974e44 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java @@ -28,26 +28,32 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.util.Collection; +/** + * Facility to synchronize access to HDFS files. Thread gains exclusive access to a file by acquiring + * a FileLock object. The lock itself is represented as file on HDFS. Relies on atomic file creation. + * Owning thread must heartbeat periodically on the lock to prevent the lock from being deemed as + * stale (i.e. lock whose owning thread have died). + */ public class FileLock { private final FileSystem fs; private final String componentID; private final Path lockFile; - private final FSDataOutputStream stream; + private final DataOutputStream lockFileStream; private LogEntry lastEntry; private static final Logger log = LoggerFactory.getLogger(DirLock.class); - private FileLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId) + private FileLock(FileSystem fs, Path lockFile, DataOutputStream lockFileStream, String spoutId) throws IOException { this.fs = fs; - String lockFileName = lockDirPath.toString() + Path.SEPARATOR_CHAR + fileToLock.getName(); - this.lockFile = new Path(lockFileName); - this.stream = fs.create(lockFile); + this.lockFile = lockFile; + this.lockFileStream = lockFileStream; this.componentID = spoutId; logProgress("0", false); } @@ -56,7 +62,7 @@ public class FileLock { throws IOException { this.fs = fs; this.lockFile = lockFile; - this.stream = fs.append(lockFile); + this.lockFileStream = fs.append(lockFile); this.componentID = spoutId; log.debug("Acquired abandoned lockFile {}", lockFile); logProgress(entry.fileOffset, true); @@ -74,22 +80,37 @@ public class FileLock { LogEntry entry = new LogEntry(now, componentID, fileOffset); String line = entry.toString(); if(prefixNewLine) - stream.writeBytes(System.lineSeparator() + line); + lockFileStream.writeBytes(System.lineSeparator() + line); else - stream.writeBytes(line); - stream.flush(); + lockFileStream.writeBytes(line); + lockFileStream.flush(); lastEntry = entry; // update this only after writing to hdfs } public void release() throws IOException { - stream.close(); + lockFileStream.close(); fs.delete(lockFile, false); } - // throws exception immediately if not able to acquire lock - public static FileLock tryLock(FileSystem hdfs, Path fileToLock, Path lockDirPath, String spoutId) + /** returns lock on file or null if file is already locked. throws if unexpected problem */ + public static FileLock tryLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId) throws IOException { - return new FileLock(hdfs, fileToLock, lockDirPath, spoutId); + String lockFileName = lockDirPath.toString() + Path.SEPARATOR_CHAR + fileToLock.getName(); + Path lockFile = new Path(lockFileName); + + try { + FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile); + if (ostream != null) { + log.info("Acquired lock on file {}. LockFile=", fileToLock, lockFile); + return new FileLock(fs, lockFile, ostream, spoutId); + } else { + log.info("Cannot lock file {} as its already locked.", fileToLock); + return null; + } + } catch (IOException e) { + log.error("Error when acquiring lock on file " + fileToLock, e); + throw e; + } } /** @@ -105,7 +126,7 @@ public class FileLock { public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, long olderThan) throws IOException { if( fs.getFileStatus(lockFile).getModificationTime() >= olderThan ) { - // HDFS timestamp may not reflect recent updates, so we double check the + //Impt: HDFS timestamp may not reflect recent appends, so we double check the // timestamp in last line of file to see when the last update was made LogEntry lastEntry = getLastEntry(fs, lockFile); if(lastEntry==null) { @@ -136,7 +157,6 @@ public class FileLock { } // takes ownership of the lock file - /** * Takes ownership of the lock file. * @param lockFile http://git-wip-us.apache.org/repos/asf/storm/blob/dcc930b9/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java index fcfe704..bdb0cdf 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java @@ -40,19 +40,14 @@ import java.io.IOException; public class TestDirLock { - static MiniDFSCluster.Builder builder; static MiniDFSCluster hdfsCluster; static FileSystem fs; static String hdfsURI; static HdfsConfiguration conf = new HdfsConfiguration(); - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); private Path lockDir = new Path("/tmp/lockdir"); - @BeforeClass public static void setupClass() throws IOException { conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000"); http://git-wip-us.apache.org/repos/asf/storm/blob/dcc930b9/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java new file mode 100644 index 0000000..8031041 --- /dev/null +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.spout; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +public class TestFileLock { + + static MiniDFSCluster.Builder builder; + static MiniDFSCluster hdfsCluster; + static FileSystem fs; + static String hdfsURI; + static HdfsConfiguration conf = new HdfsConfiguration(); + + private Path filesDir = new Path("/tmp/lockdir"); + private Path locksDir = new Path("/tmp/lockdir"); + + @BeforeClass + public static void setupClass() throws IOException { + conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000"); + builder = new MiniDFSCluster.Builder(new Configuration()); + hdfsCluster = builder.build(); + fs = hdfsCluster.getFileSystem(); + hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/"; + } + + @AfterClass + public static void teardownClass() throws IOException { + fs.close(); + hdfsCluster.shutdown(); + } + + @Before + public void setUp() throws Exception { + assert fs.mkdirs(filesDir) ; + } + + @After + public void tearDown() throws Exception { + fs.delete(filesDir, true); + } + + @Test + public void testBasic() throws Exception { + // create empty files in filesDir + Path file1 = new Path(filesDir + Path.SEPARATOR + "file1"); + Path file2 = new Path(filesDir + Path.SEPARATOR + "file2"); + fs.create(file1).close(); + fs.create(file2).close(); // create empty file + + // acquire lock on file1 and verify if worked + FileLock lock1a = FileLock.tryLock(fs, file1, locksDir, "spout1"); + Assert.assertNotNull(lock1a); + Assert.assertTrue(fs.exists(lock1a.getLockFile())); + Assert.assertEquals(lock1a.getLockFile().getParent(), locksDir); // verify lock file location + Assert.assertEquals(lock1a.getLockFile().getName(), file1.getName()); // very lock filename + + // acquire another lock on file1 and verify it failed + FileLock lock1b = FileLock.tryLock(fs, file1, locksDir, "spout1"); + Assert.assertNull(lock1b); + + // release lock on file1 and check + lock1a.release(); + Assert.assertFalse(fs.exists(lock1a.getLockFile())); + + // Retry locking and verify + FileLock lock1c = FileLock.tryLock(fs, file1, locksDir, "spout1"); + Assert.assertNotNull(lock1c); + Assert.assertTrue(fs.exists(lock1c.getLockFile())); + Assert.assertEquals(lock1c.getLockFile().getParent(), locksDir); // verify lock file location + Assert.assertEquals(lock1c.getLockFile().getName(), file1.getName()); // very lock filename + + // try locking another file2 at the same time + FileLock lock2a = FileLock.tryLock(fs, file2, locksDir, "spout1"); + Assert.assertNotNull(lock2a); + Assert.assertTrue(fs.exists(lock2a.getLockFile())); + Assert.assertEquals(lock2a.getLockFile().getParent(), locksDir); // verify lock file location + Assert.assertEquals(lock2a.getLockFile().getName(), file1.getName()); // very lock filename + + // release both locks + lock2a.release(); + Assert.assertFalse(fs.exists(lock2a.getLockFile())); + lock1c.release(); + Assert.assertFalse(fs.exists(lock1c.getLockFile())); + } + + +}
