[
https://issues.apache.org/jira/browse/STORM-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15057411#comment-15057411
]
ASF GitHub Bot commented on STORM-1199:
---------------------------------------
Github user satishd commented on a diff in the pull request:
https://github.com/apache/storm/pull/936#discussion_r47599629
--- Diff:
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java ---
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.storm.hdfs.common.HdfsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Collection;
+
+/**
+ * Facility to synchronize access to HDFS files. Thread gains exclusive
access to a file by acquiring
+ * a FileLock object. The lock itself is represented as file on HDFS.
Relies on atomic file creation.
+ * Owning thread must heartbeat periodically on the lock to prevent the
lock from being deemed as
+ * stale (i.e. lock whose owning thread have died).
+ */
+public class FileLock {
+
+ private final FileSystem fs;
+ private final String componentID;
+ private final Path lockFile;
+ private final FSDataOutputStream lockFileStream;
+ private LogEntry lastEntry;
+
+ private static final Logger log = LoggerFactory.getLogger(DirLock.class);
+
+ private FileLock(FileSystem fs, Path lockFile, FSDataOutputStream
lockFileStream, String spoutId)
+ throws IOException {
+ this.fs = fs;
+ this.lockFile = lockFile;
+ this.lockFileStream = lockFileStream;
+ this.componentID = spoutId;
+ logProgress("0", false);
+ }
+
+ private FileLock(FileSystem fs, Path lockFile, String spoutId, LogEntry
entry)
+ throws IOException {
+ this.fs = fs;
+ this.lockFile = lockFile;
+ this.lockFileStream = fs.append(lockFile);
+ this.componentID = spoutId;
+ log.debug("Acquired abandoned lockFile {}", lockFile);
+ logProgress(entry.fileOffset, true);
+ }
+
+ public void heartbeat(String fileOffset) throws IOException {
+ logProgress(fileOffset, true);
+ }
+
+ // new line is at beginning of each line (instead of end) for better
recovery from
+ // partial writes of prior lines
+ private void logProgress(String fileOffset, boolean prefixNewLine)
+ throws IOException {
+ long now = System.currentTimeMillis();
+ LogEntry entry = new LogEntry(now, componentID, fileOffset);
+ String line = entry.toString();
+ if(prefixNewLine)
+ lockFileStream.writeBytes(System.lineSeparator() + line);
+ else
+ lockFileStream.writeBytes(line);
+ lockFileStream.hflush();
+
+ lastEntry = entry; // update this only after writing to hdfs
+ }
+
+ public void release() throws IOException {
+ lockFileStream.close();
+ fs.delete(lockFile, false);
--- End diff --
fs.delete() returns false if the file is not deleted. You may want to throw
an IOException for that. Caller of this method assumes file is not deleted only
when IOException is thrown. We may even want to have log-warn when delete is
not successful.
Can we have javadoc for this method to describe the contract?
> Create HDFS Spout
> -----------------
>
> Key: STORM-1199
> URL: https://issues.apache.org/jira/browse/STORM-1199
> Project: Apache Storm
> Issue Type: New Feature
> Reporter: Roshan Naik
> Assignee: Roshan Naik
> Attachments: HDFSSpoutforStorm v2.pdf, HDFSSpoutforStorm.pdf,
> hdfs-spout.1.patch
>
>
> Create an HDFS spout so that Storm can suck in data from files in a HDFS
> directory
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)