[
https://issues.apache.org/jira/browse/STORM-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089490#comment-15089490
]
ASF GitHub Bot commented on STORM-1199:
---------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/936#discussion_r49209735
--- Diff: external/storm-hdfs/README.md ---
@@ -405,7 +410,123 @@ On worker hosts the bolt/trident-state code will use
the keytab file with princi
Namenode. This method is little dangerous as you need to ensure all
workers have the keytab file at the same location and you need
to remember this as you bring up new hosts in the cluster.
-## License
+---
+
+# HDFS Spout
+
+Hdfs spout is intended to allow feeding data into Storm from a HDFS
directory.
+It will actively monitor the directory to consume any new files that
appear in the directory.
+HDFS spout does not support Trident currently.
+
+**Impt**: Hdfs spout assumes that the files being made visible to it in
the monitored directory
+are NOT actively being written to. Only after a file is completely written
should it be made
+visible to the spout. This can be achieved by either writing the files out
to another directory
+and once completely written, move it to the monitored directory.
Alternatively the file
+can be created with a '.ignore' suffix in the monitored directory and
after data is completely
+written, rename it without the suffix. File names with a '.ignore' suffix
are ignored
+by the spout.
+
+When the spout is actively consuming a file, it renames the file with a
'.inprogress' suffix.
+After consuming all the contents in the file, the file will be moved to a
configurable *done*
+directory and the '.inprogress' suffix will be dropped.
+
+**Concurrency** If multiple spout instances are used in the topology, each
instance will consume
+a different file. Synchronization among spout instances is done using lock
files created in a
+(by default) '.lock' subdirectory under the monitored directory. A file
with the same name
+as the file being consumed (without the in progress suffix) is created in
the lock directory.
+Once the file is completely consumed, the corresponding lock file is
deleted.
+
+**Recovery from failure**
+Periodically, the spout also records progress information wrt to how much
of the file has been
+consumed in the lock file. In case of an crash of the spout instance (or
force kill of topology)
+another spout can take over the file and resume from the location recorded
in the lock file.
+
+Certain error conditions (such spout crashing) can leave behind lock files
without deleting them.
+Such a stale lock file also indicates that the corresponding input file
has also not been completely
+processed. When detected, ownership of such stale lock files will be
transferred to another spout.
+The configuration 'hdfsspout.lock.timeout.sec' is used to specify the
duration of inactivity after
+which lock files should be considered stale. For lock file ownership
transfer to succeed, the HDFS
+lease on the file (from prev lock owner) should have expired. Spouts scan
for stale lock files
+before selecting the next file for consumption.
+
+**Lock on *.lock* Directory**
+Hdfs spout instances create a *DIRLOCK* file in the .lock directory to
co-ordinate certain accesses to
+the .lock dir itself. A spout will try to create it when it needs access
to the .lock directory and
+then delete it when done. In error conditions such as a topology crash,
force kill or untimely death
+of a spout, this file may not get deleted. Future running instances of the
spout will eventually recover
+this once the DIRLOCK file becomes stale due to inactivity for
hdfsspout.lock.timeout.sec seconds.
+
+## Usage
+
+The following example creates an HDFS spout that reads text files from
HDFS path hdfs://localhost:54310/source.
+
+```java
+// Instantiate spout
+HdfsSpout textReaderSpout = new
HdfsSpout().withOutputFields(TextFileReader.defaultFields);
+// HdfsSpout seqFileReaderSpout = new
HdfsSpout().withOutputFields(SequenceFileReader.defaultFields);
+
+// textReaderSpout.withConfigKey("custom.keyname"); // Optional. Not
required normally unless you need to change the keyname use to provide hds
settings. This keyname defaults to 'hdfs.config'
+
+// Configure it
+Config conf = new Config();
+conf.put(Configs.SOURCE_DIR, "hdfs://localhost:54310/source");
+conf.put(Configs.ARCHIVE_DIR, "hdfs://localhost:54310/done");
+conf.put(Configs.BAD_DIR, "hdfs://localhost:54310/badfiles");
+conf.put(Configs.READER_TYPE, "text"); // or 'seq' for sequence files
+
+// Create & configure topology
+TopologyBuilder builder = new TopologyBuilder();
+builder.setSpout("hdfsspout", textReaderSpout, SPOUT_NUM);
+
+// Setup bolts and other topology configuration
+ ..snip..
+
+// Submit topology with config
+StormSubmitter.submitTopologyWithProgressBar("topologyName", conf,
builder.createTopology());
+```
+
+See sample HdfsSpoutTopolgy in storm-starter.
+
+## Configuration Settings
+Class HdfsSpout provided following methods for configuration:
+
+`HdfsSpout withOutputFields(String... fields)` : This sets the names for
the output fields.
+The number of fields depends upon the reader being used. For convenience,
built-in reader types
+expose a static member called `defaultFields` that can be used for this.
+
+ `HdfsSpout withConfigKey(String configKey)`
+Optional setting. It allows overriding the default key name
('hdfs.config') with new name for
+specifying HDFS configs. Typically used to specify kerberos keytab and
principal.
+
+**E.g:**
+```java
+ HashMap map = new HashMap();
+ map.put("hdfs.keytab.file", "/path/to/keytab");
+ map.put("hdfs.kerberos.principal","[email protected]");
+ conf.set("hdfs.config", map)
+```
+
+Only settings mentioned in **bold** are required.
+
+| Setting | Default | Description |
+|------------------------------|-------------|-------------|
+|**hdfsspout.reader.type** | | Indicates the reader for
the file format. Set to 'seq' for reading sequence files or 'text' for text
files. Set to a fully qualified class name if using a custom type (that
implements interface org.apache.storm.hdfs.spout.FileReader)|
+|**hdfsspout.hdfs** | | HDFS URI. Example:
hdfs://namenodehost:8020
+|**hdfsspout.source.dir** | | HDFS location from where to
read. E.g. /data/inputfiles |
+|**hdfsspout.archive.dir** | | After a file is processed
completely it will be moved to this directory. E.g. /data/done|
+|**hdfsspout.badfiles.dir** | | if there is an error
parsing a file's contents, the file is moved to this location. E.g.
/data/badfiles |
+|hdfsspout.lock.dir | '.lock' subdirectory under
hdfsspout.source.dir | Dir in which lock files will be created. Concurrent HDFS
spout instances synchronize using *lock* files. Before processing a file the
spout instance creates a lock file in this directory with same name as input
file and deletes this lock file after processing the file. Spouts also
periodically makes a note of their progress (wrt reading the input file) in the
lock file so that another spout instance can resume progress on the same file
if the spout dies for any reason.|
+|hdfsspout.ignore.suffix | .ignore | File names with this suffix
in the in the hdfsspout.source.dir location will not be processed|
+|hdfsspout.commit.count | 20000 | Record progress in the lock
file after these many records are processed. If set to 0, this criterion will
not be used. |
+|hdfsspout.commit.sec | 10 | Record progress in the lock
file after these many seconds have elapsed. Must be greater than 0 |
+|hdfsspout.max.outstanding | 10000 | Limits the number of
unACKed tuples by pausing tuple generation (if ACKers are used in the topology)
|
--- End diff --
Do we want to explain how this works with max.spout.pending, and why we
need both?
> Create HDFS Spout
> -----------------
>
> Key: STORM-1199
> URL: https://issues.apache.org/jira/browse/STORM-1199
> Project: Apache Storm
> Issue Type: New Feature
> Reporter: Roshan Naik
> Assignee: Roshan Naik
> Attachments: HDFSSpoutforStorm v2.pdf, HDFSSpoutforStorm.pdf,
> hdfs-spout.1.patch
>
>
> Create an HDFS spout so that Storm can suck in data from files in a HDFS
> directory
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)