Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/storm/pull/936#discussion_r49158800
--- Diff:
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java ---
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.storm.hdfs.common.HdfsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Facility to synchronize access to HDFS directory. The lock itself is
represented
+ * as a file in the same directory. Relies on atomic file creation.
+ */
+public class DirLock {
+ private FileSystem fs;
+ private final Path lockFile;
+ public static final String DIR_LOCK_FILE = "DIRLOCK";
+ private static final Logger LOG = LoggerFactory.getLogger(DirLock.class);
+ private DirLock(FileSystem fs, Path lockFile) throws IOException {
+ if( fs.isDirectory(lockFile) ) {
+ throw new IllegalArgumentException(lockFile.toString() + " is not a
directory");
+ }
+ this.fs = fs;
+ this.lockFile = lockFile;
+ }
+
+ /** Get a lock on file if not already locked
+ *
+ * @param fs
+ * @param dir the dir on which to get a lock
+ * @return The lock object if it the lock was acquired. Returns null if
the dir is already locked.
+ * @throws IOException if there were errors
+ */
+ public static DirLock tryLock(FileSystem fs, Path dir) throws
IOException {
+ Path lockFile = getDirLockFile(dir);
+
+ try {
+ FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
--- End diff --
The RawLocalFileSystem implementation does not seem to take care of
atomicity -
[link](https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java#L299).
In the distributed mode name node could guarantee atomicity by locking, but I
havent checked the details. Did you run the tests in local or distributed mode
? Anyways it would be hard to get the order and timing right for the tests to
fail.
It might be good to add some comments/docs to call out that the locking
semantics depends on the underlying implementation guarantees.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---